Re: Permission to contribute on LZO compression enablement for Beam Java SDK

2019-11-07 Thread Amogh Tiwari
Thanks Luke :)

On Fri, Nov 8, 2019 at 1:01 AM Luke Cwik  wrote:

> Welcome, I have added you as a contributor and assigned the ticket to you.
>
> On Thu, Nov 7, 2019 at 4:21 AM Amogh Tiwari  wrote:
>
>> Hi,
>>
>> I would like to contribute on enabling Apache Beam's java SDK to work
>> with LZO compression. Please add me as a contributor so that I can work on
>> this.
>> I've also raised a ticket
>>  for the same.
>>
>> Thanks and best regards,
>> Amogh Tiwari
>>
>


Re: RabbitMQ and CheckpointMark feasibility

2019-11-07 Thread Reuven Lax
Just to clarify one thing: CheckpointMark does not need to be Java
Seralizable. All that's needed is do return a Coder for the CheckpointMark
in getCheckpointMarkCoder.

On Thu, Nov 7, 2019 at 7:29 PM Eugene Kirpichov  wrote:

> Hi Daniel,
>
> This is probably insufficiently well documented. The CheckpointMark is
> used for two purposes:
> 1) To persistently store some notion of how much of the stream has been
> consumed, so that if something fails we can tell the underlying streaming
> system where to start reading when we re-create the reader. This is why
> CheckpointMark is Serializable. E.g. this makes sense for Kafka.
> 2) To do acks - to let the underlying streaming system know that the Beam
> pipeline will never need data up to this CheckpointMark. Acking does not
> require serializability - runners call ack() on the same in-memory instance
> of CheckpointMark that was produced by the reader. E.g. this makes sense
> for RabbitMq or Pubsub.
>
> In practice, these two capabilities tend to be mutually exclusive: some
> streaming systems can provide a serializable CheckpointMark, some can do
> acks, some can do neither - but very few (or none) can do both, and it's
> debatable whether it even makes sense for a system to provide both
> capabilities: usually acking is an implicit form of streaming-system-side
> checkpointing, i.e. when you re-create the reader you don't actually need
> to carry over any information from an old CheckpointMark - the necessary
> state (which records should be delivered) is maintained on the streaming
> system side.
>
> These two are lumped together into one API simply because that was the
> best design option we came up with (not for lack of trying, but suggestions
> very much welcome - AFAIK nobody is happy with it).
>
> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it
> can do acks. So you can simply ignore the non-serializability.
>
> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert 
> wrote:
>
>> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.
>> As part of this I switched to a pull-based API rather than the
>> previously-used push-based. This has caused some nebulous problems so
>> put up a correction PR that I think needs some eyes fairly quickly as
>> I'd consider master to be broken for rabbitmq right now. The PR keeps
>> the upgrade but reverts to the same push-based implementation as in 4.x:
>> https://github.com/apache/beam/pull/9977 )
>>
>> Regardless, in trying to get the pull-based API to work, I'm finding the
>> interactions between rabbitmq and beam with CheckpointMark to be
>> fundamentally impossible to implement so I'm hoping for some input here.
>>
>> CheckointMark itself must be Serializable, presumably this means it gets
>> shuffled around between nodes. However 'Channel', the tunnel through
>> which it communicates with Rabbit to ack messages and finalize the
>> checkpoint, is non-Serializable. Like most other CheckpointMark
>> implementations, Channel is 'transient'. When a new CheckpointMark is
>> instantiated, it's given a Channel. If an existing one is supplied to
>> the Reader's constructor (part of the 'startReader()' interface), the
>> channel is overwritten.
>>
>> *However*, Rabbit does not support 'ack'ing messages on a channel other
>> than the one that consumed them in the first place. Attempting to do so
>> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
>>
>> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>> ).
>>
>> Truthfully, I don't really understand how the current implementation is
>> working; it seems like a happy accident. But I'm curious if someone
>> could help me debug and implement how to bridge the
>> re-usable/serializable CheckpointMark requirement in Beam with this
>> limitation of Rabbit.
>>
>> Thanks,
>> -Daniel Robert
>>
>>


Re: [Discuss] Beam mascot

2019-11-07 Thread Reza Rokni
Salmon... they love streams? :-)

On Fri, 8 Nov 2019 at 12:00, Kenneth Knowles  wrote:

> Agree with Aizhamal that it doesn't matter if they are taken if they are
> not too close in space to Beam: Apache projects, big data, log processing,
> stream processing. Not a legal opinion, but an aesthetic opinion. So I
> would keep Lemur as a possibility. Definitely nginx is far away from Beam
> so it seems OK as long as the art is different.
>
> Also FWIW there are many kinds of Lemurs, and also related Tarsier, of the
> only uncontroversial and non-extinct infraorder within
> suborder Strepsirrhini. I think there's enough room for another mascot with
> big glowing eyes :-). The difference in the designer's art will be more
> significant than the taxonomy.
>
> Kenn
>
> On Tue, Nov 5, 2019 at 4:37 PM Aizhamal Nurmamat kyzy 
> wrote:
>
>> Aww.. that Hoover beaver is cute. But then lemur is also "taken" [1] and
>> the owl too [2].
>>
>> Personally, I don't think it matters much which mascots are taken, as
>> long as the project is not too close in the same space as Beam. Also, it's
>> good to just get all ideas out. We should still consider hedgehogs. I
>> looked up fireflies, they don't look nice, but i am not dismissing the idea
>> :/
>>
>> And thanks for reaching out to designers, Max. To your point:
>> >how do we arrive at a concrete design
>> >once we have consensus on the type of mascot?
>> My thinking is that the designer will come up with few sketches, then we
>> vote on one here in the dev@ list.
>>
>> [1]
>> https://www.nginx.com/blog/introducing-the-lemur-stack-and-an-official-nginx-mascot/
>> [2] https://blog.readme.io/why-every-startup-needs-a-mascot/
>>
>> On Tue, Nov 5, 2019 at 5:31 AM Maximilian Michels  wrote:
>>
>>> Quick update: The mentioned designer has gotten back to me and offered
>>> to sketch something until the end of the week. I've pointed him to this
>>> thread and the existing logo material:
>>> https://beam.apache.org/community/logos/
>>>
>>> [I don't want to interrupt the discussion in any way, I just think
>>> having something concrete will help us to eventually decide what we
>>> want.]
>>>
>>> On 05.11.19 12:49, Maximilian Michels wrote:
>>> > How about fireflies in the Beam light rays? ;)
>>> >
>>> >> Feels like "Beam" would go well with an animal that has glowing
>>> bright
>>> >> eyes such as a lemur
>>> >
>>> > I love the lemur idea because it has almost orange eyes.
>>> >
>>> > Thanks for starting this Aizhamal! I've recently talked to a designer
>>> > which is somewhat famous for creating logos. He was inclined to work
>>> on
>>> > a software project logo. Of course there is a little bit of a price
>>> tag
>>> > attached, though the quote sounded reasonable.
>>> >
>>> > It raises the general question, how do we arrive at a concrete design
>>> > once we have consensus on the type of mascot? I believe there are also
>>> > designers working at companies using Beam ;)
>>> >
>>> > Cheers,
>>> > Max
>>> >
>>> > On 05.11.19 06:14, Eugene Kirpichov wrote:
>>> >> Feels like "Beam" would go well with an animal that has glowing
>>> bright
>>> >> eyes (with beams of light shooting out of them), such as a lemur [1]
>>> >> or an owl.
>>> >>
>>> >> [1] https://www.cnn.com/travel/article/madagascar-lemurs/index.html
>>> >>
>>> >> On Mon, Nov 4, 2019 at 7:33 PM Kenneth Knowles >> >> > wrote:
>>> >>
>>> >> Yes! Let's have a mascot!
>>> >>
>>> >> Direct connections often have duplicates. For example in the log
>>> >> processing space, there is
>>> https://www.linkedin.com/in/hooverbeaver
>>> >>
>>> >> I like a flying squirrel, but Flink already is a squirrel.
>>> >>
>>> >> Hedgehog? I could not find any source of confusion for this one.
>>> >>
>>> >> Kenn
>>> >>
>>> >>
>>> >> On Mon, Nov 4, 2019 at 6:02 PM Robert Burke >> >> > wrote:
>>> >>
>>> >> As both a Canadian, and the resident fan of a programming
>>> >> language with a rodent mascot, I endorse this mascot.
>>> >>
>>> >> On Mon, Nov 4, 2019, 4:11 PM David Cavazos <
>>> dcava...@google.com
>>> >> > wrote:
>>> >>
>>> >> I like it, a beaver could be a cute mascot :)
>>> >>
>>> >> On Mon, Nov 4, 2019 at 3:33 PM Aizhamal Nurmamat kyzy
>>> >> mailto:aizha...@apache.org>> wrote:
>>> >>
>>> >> Hi everybody,
>>> >>
>>> >> I think the idea of creating a Beam mascot has been
>>> >> brought up a couple times here in the past, but I
>>> would
>>> >> like us to go through with it this time if we are all
>>> in
>>> >> agreement:)
>>> >>
>>> >> We can brainstorm in this thread what the mascot
>>> should
>>> >> be given Beam’s characteristics and principles. What
>>> do
>>> >> you all think?
>>> >>
>>> >> For example, I am proposing a 

Re: [discuss] More dimensions for the Capability Matrix

2019-11-07 Thread Thomas Weise
FWIW there are currently at least 2 instances of capability matrix [1] [2].

[1] has been in need of a refresh for a while.

[2] is more useful but only covers portable runners and is hard to find.

Thomas

[1] https://beam.apache.org/documentation/runners/capability-matrix/
[2] https://s.apache.org/apache-beam-portability-support-table

On Thu, Nov 7, 2019 at 7:52 PM Pablo Estrada  wrote:

> Hi all,
> I think this is a relatively common question:
>
> - Can I do X with runner Y, and SDK Z?
>
> The answers vary significantly between SDK and Runner pairs. This makes it
> such that the current Capability Matrix falls somewhat short when potential
> users / solutions architects / etc are trying to decide to adopt Beam, and
> which Runner / SDK to use.
>
> I think we need to put some effort in building a capability matrix that
> expresses this information - and maintain it updated.
>
> I would like to discuss a few things:
> - Does it make sense to do this?
> - If it does, what's a good way of doing it? Should we expand the existing
> Capability Matrix to support SDKs as well? Or should we have a new one?
> - Any other thoughts you may have about the idea.
>
> Best
> -P.
>


Re: [Discuss] Beam mascot

2019-11-07 Thread Kenneth Knowles
Agree with Aizhamal that it doesn't matter if they are taken if they are
not too close in space to Beam: Apache projects, big data, log processing,
stream processing. Not a legal opinion, but an aesthetic opinion. So I
would keep Lemur as a possibility. Definitely nginx is far away from Beam
so it seems OK as long as the art is different.

Also FWIW there are many kinds of Lemurs, and also related Tarsier, of the
only uncontroversial and non-extinct infraorder within
suborder Strepsirrhini. I think there's enough room for another mascot with
big glowing eyes :-). The difference in the designer's art will be more
significant than the taxonomy.

Kenn

On Tue, Nov 5, 2019 at 4:37 PM Aizhamal Nurmamat kyzy 
wrote:

> Aww.. that Hoover beaver is cute. But then lemur is also "taken" [1] and
> the owl too [2].
>
> Personally, I don't think it matters much which mascots are taken, as long
> as the project is not too close in the same space as Beam. Also, it's good
> to just get all ideas out. We should still consider hedgehogs. I looked up
> fireflies, they don't look nice, but i am not dismissing the idea :/
>
> And thanks for reaching out to designers, Max. To your point:
> >how do we arrive at a concrete design
> >once we have consensus on the type of mascot?
> My thinking is that the designer will come up with few sketches, then we
> vote on one here in the dev@ list.
>
> [1]
> https://www.nginx.com/blog/introducing-the-lemur-stack-and-an-official-nginx-mascot/
> [2] https://blog.readme.io/why-every-startup-needs-a-mascot/
>
> On Tue, Nov 5, 2019 at 5:31 AM Maximilian Michels  wrote:
>
>> Quick update: The mentioned designer has gotten back to me and offered
>> to sketch something until the end of the week. I've pointed him to this
>> thread and the existing logo material:
>> https://beam.apache.org/community/logos/
>>
>> [I don't want to interrupt the discussion in any way, I just think
>> having something concrete will help us to eventually decide what we want.]
>>
>> On 05.11.19 12:49, Maximilian Michels wrote:
>> > How about fireflies in the Beam light rays? ;)
>> >
>> >> Feels like "Beam" would go well with an animal that has glowing bright
>> >> eyes such as a lemur
>> >
>> > I love the lemur idea because it has almost orange eyes.
>> >
>> > Thanks for starting this Aizhamal! I've recently talked to a designer
>> > which is somewhat famous for creating logos. He was inclined to work on
>> > a software project logo. Of course there is a little bit of a price tag
>> > attached, though the quote sounded reasonable.
>> >
>> > It raises the general question, how do we arrive at a concrete design
>> > once we have consensus on the type of mascot? I believe there are also
>> > designers working at companies using Beam ;)
>> >
>> > Cheers,
>> > Max
>> >
>> > On 05.11.19 06:14, Eugene Kirpichov wrote:
>> >> Feels like "Beam" would go well with an animal that has glowing bright
>> >> eyes (with beams of light shooting out of them), such as a lemur [1]
>> >> or an owl.
>> >>
>> >> [1] https://www.cnn.com/travel/article/madagascar-lemurs/index.html
>> >>
>> >> On Mon, Nov 4, 2019 at 7:33 PM Kenneth Knowles > >> > wrote:
>> >>
>> >> Yes! Let's have a mascot!
>> >>
>> >> Direct connections often have duplicates. For example in the log
>> >> processing space, there is
>> https://www.linkedin.com/in/hooverbeaver
>> >>
>> >> I like a flying squirrel, but Flink already is a squirrel.
>> >>
>> >> Hedgehog? I could not find any source of confusion for this one.
>> >>
>> >> Kenn
>> >>
>> >>
>> >> On Mon, Nov 4, 2019 at 6:02 PM Robert Burke > >> > wrote:
>> >>
>> >> As both a Canadian, and the resident fan of a programming
>> >> language with a rodent mascot, I endorse this mascot.
>> >>
>> >> On Mon, Nov 4, 2019, 4:11 PM David Cavazos <
>> dcava...@google.com
>> >> > wrote:
>> >>
>> >> I like it, a beaver could be a cute mascot :)
>> >>
>> >> On Mon, Nov 4, 2019 at 3:33 PM Aizhamal Nurmamat kyzy
>> >> mailto:aizha...@apache.org>> wrote:
>> >>
>> >> Hi everybody,
>> >>
>> >> I think the idea of creating a Beam mascot has been
>> >> brought up a couple times here in the past, but I would
>> >> like us to go through with it this time if we are all
>> in
>> >> agreement:)
>> >>
>> >> We can brainstorm in this thread what the mascot should
>> >> be given Beam’s characteristics and principles. What do
>> >> you all think?
>> >>
>> >> For example, I am proposing a beaver as a mascot,
>> >> because:
>> >> 1. Beavers build dams out of logs for streams
>> >> 2. The name is close to Beam
>> >> 3. And with the right imagination, you can make a
>> really
>> >>  

[discuss] More dimensions for the Capability Matrix

2019-11-07 Thread Pablo Estrada
Hi all,
I think this is a relatively common question:

- Can I do X with runner Y, and SDK Z?

The answers vary significantly between SDK and Runner pairs. This makes it
such that the current Capability Matrix falls somewhat short when potential
users / solutions architects / etc are trying to decide to adopt Beam, and
which Runner / SDK to use.

I think we need to put some effort in building a capability matrix that
expresses this information - and maintain it updated.

I would like to discuss a few things:
- Does it make sense to do this?
- If it does, what's a good way of doing it? Should we expand the existing
Capability Matrix to support SDKs as well? Or should we have a new one?
- Any other thoughts you may have about the idea.

Best
-P.


Re: RabbitMQ and CheckpointMark feasibility

2019-11-07 Thread Eugene Kirpichov
Hi Daniel,

This is probably insufficiently well documented. The CheckpointMark is used
for two purposes:
1) To persistently store some notion of how much of the stream has been
consumed, so that if something fails we can tell the underlying streaming
system where to start reading when we re-create the reader. This is why
CheckpointMark is Serializable. E.g. this makes sense for Kafka.
2) To do acks - to let the underlying streaming system know that the Beam
pipeline will never need data up to this CheckpointMark. Acking does not
require serializability - runners call ack() on the same in-memory instance
of CheckpointMark that was produced by the reader. E.g. this makes sense
for RabbitMq or Pubsub.

In practice, these two capabilities tend to be mutually exclusive: some
streaming systems can provide a serializable CheckpointMark, some can do
acks, some can do neither - but very few (or none) can do both, and it's
debatable whether it even makes sense for a system to provide both
capabilities: usually acking is an implicit form of streaming-system-side
checkpointing, i.e. when you re-create the reader you don't actually need
to carry over any information from an old CheckpointMark - the necessary
state (which records should be delivered) is maintained on the streaming
system side.

These two are lumped together into one API simply because that was the best
design option we came up with (not for lack of trying, but suggestions very
much welcome - AFAIK nobody is happy with it).

RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it
can do acks. So you can simply ignore the non-serializability.

On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert  wrote:

> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.
> As part of this I switched to a pull-based API rather than the
> previously-used push-based. This has caused some nebulous problems so
> put up a correction PR that I think needs some eyes fairly quickly as
> I'd consider master to be broken for rabbitmq right now. The PR keeps
> the upgrade but reverts to the same push-based implementation as in 4.x:
> https://github.com/apache/beam/pull/9977 )
>
> Regardless, in trying to get the pull-based API to work, I'm finding the
> interactions between rabbitmq and beam with CheckpointMark to be
> fundamentally impossible to implement so I'm hoping for some input here.
>
> CheckointMark itself must be Serializable, presumably this means it gets
> shuffled around between nodes. However 'Channel', the tunnel through
> which it communicates with Rabbit to ack messages and finalize the
> checkpoint, is non-Serializable. Like most other CheckpointMark
> implementations, Channel is 'transient'. When a new CheckpointMark is
> instantiated, it's given a Channel. If an existing one is supplied to
> the Reader's constructor (part of the 'startReader()' interface), the
> channel is overwritten.
>
> *However*, Rabbit does not support 'ack'ing messages on a channel other
> than the one that consumed them in the first place. Attempting to do so
> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
>
> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
> ).
>
> Truthfully, I don't really understand how the current implementation is
> working; it seems like a happy accident. But I'm curious if someone
> could help me debug and implement how to bridge the
> re-usable/serializable CheckpointMark requirement in Beam with this
> limitation of Rabbit.
>
> Thanks,
> -Daniel Robert
>
>


Re: Pipeline AttributeError on Python3

2019-11-07 Thread Valentyn Tymofieiev
 I think we have heard of this issue from the same source:

This looks exactly like a race condition that we've encountered on Python
> 3.7.1: There's a bug in some older 3.7.x releases that breaks the
> thread-safety of the unpickler, as concurrent unpickle threads can access a
> module before it has been fully imported. See
> https://bugs.python.org/issue34572 for more information.
>
> The traceback shows a Python 3.6 venv so this could be a different issue
> (the unpickle bug was introduced in version 3.7). If it's the same bug then
> upgrading to Python 3.7.3 or higher should fix that issue. One potential
> workaround is to ensure that all of the modules get imported during the
> initialization of the sdk_worker, as this bug only affects imports done by
> the unpickler.


The symptoms do sound similar, so I would try to reproduce your issue on
3.7.3 and see if it is gone, or try to reproduce
https://bugs.python.org/issue34572 in the version of interpreter you use.
If this doesn't help, you can try to reproduce the race using your input.

To get the output of serialized do fn, you could do the following:
1. Patch https://github.com/apache/beam/pull/10036.
2. Set logging level to DEBUG, see:
https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
.
3. Check for log output for payload of your transform, it may look like:

transforms {
  key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
  value {
spec {
  urn: "beam:transform:pardo:v1"
  payload: "\n\347\006\n\275\006\n
beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT


Then you can extract the output of pickled fn:

from apache_beam.utils import proto_utils
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.internal import pickler

payload = b'\n\347\006\n\275\006\n
beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
pardo_payload = proto_utils.parse_Bytes(x, beam_runner_api_pb2.ParDoPayload)
pickled_fn = pardo_payload.do_fn.spec.payload

pickler.loads(pickle_fn) # Presumably the race happens here when unpickling
one of your transforms (pricingrealtime.aggregation.aggregation_transform).


On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar  wrote:

> Thanks Valentyn,
>
> Aggregation_transform.py doesn't have any transformation method which
> extends beam.DoFn. We are using plain python method which we passed in
> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
> please let me the process?
>
> I also heard that some people ran into this issue on Python 3.7.1 but the
> same issue is not present on Python 3.7.3. Can you confirm this?
>
>
>
> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev 
> wrote:
>
>> +user@, bcc: dev@
>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
>> this issue, although we saw instances of this bug in exactly opposite
>> scenarios - when pipeline was defined *in one file*, but not in multiple
>> files.
>>
>> Could you try replacing instances of super() in aggregation_transform.py
>> as done in https://github.com/apache/beam/pull/9513 and see if this
>> issue is still reproducible?
>>
>> If that doesn't work, I would try to get the dump of serialized_fn, and
>> try to reproduce the issue in isolated environment, such as:
>>
>> form apache_beam.internal import pickler
>> serialized_fn = "..content.."
>> pickler.loads(serialized_fn)
>>
>> then I would try to trim the doFn in the example to a
>> minimally-reproducible example. It could be another issue with dill
>> dependency.
>>
>>
>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar 
>> wrote:
>>
>>> Hi All,
>>>
>>> We have noticed a weird intermittent issue on Python3 but we don't run
>>> into this issue on python2. Sometimes when we are trying to submit the
>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>> double-checked and we do find the attribute/methods are present in the
>>> right module and in right place but somehow the pipeline still complains
>>> about it. In some cases, we refer methods before their definition. We tried
>>> to reorder the method definition but that didn't help at all.
>>>
>>> We don't see the same issue when the entire pipeline is defined in one
>>> file. Also, note that this doesn't happen all the time when we submit the
>>> pipeline, so I feel it is some kind of race condition. When we enable the
>>> worker recycle logic it happens most of the time when sdk worker is
>>> recycled.
>>>
>>> Some more information about the environment:
>>> Python version: 3
>>> Beam version: 2.16
>>> Flink version: 1.8
>>>
>>> *Stack trace: *
>>>
>>>- :
>>>
>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>> bundle}
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>> at
>>> 

Re: Cython unit test suites running without Cythonized sources

2019-11-07 Thread Chad Dombrova
Hi,
Answers inline below,

It's unclear from the nose source[1] whether it's calling build_py
>> and build_ext, or just build_ext.  It's also unclear whether the result of
>> that build is actually used.  When python setup.py nosetests runs, it runs
>> inside of a virtualenv created by tox, and tox has already installed beam
>> into that venv.  It seems unlikely to me that build_ext or build_py is
>> going to install over top of the beam package installed by tox, but who
>> knows, it may end up first on the path.  Also recall that there is an sdist
>> gradle task running before tox that creates a tarball that is passed to
>> run_tox.sh which passes it along to tox --installpkg flag[2] so that tox
>> won't build beam itself.
>>
>
> I believe the build step is executed as expected and during installation
> results in cythonized package to be installed. This could be verified by,
> in a new virtual environment creating a source distribution, installing
> cython, then installing the source distribution. Resulting installation
> does have the .so files. This is done before running nosetests.
>

Even if it *is* working, I think it's a pretty poor design that we build it
once in sdist and then rebuild it again with nose.  It's very obfuscated
and brittle, hence we're still debating the probable outcome.  We should
choose one place to build and that should either be the sdist gradle task
or tox, not the test command.


> We should designate a single place that always does the build.  I thought
>> that was supposed to be the gradle sdist task, but invoking nose via
>> `python setup.py` means that we're introducing the possibility that a build
>> is occurring which may or may not be used, depending on the entirely
>> unclear dependencies of the setup commands, and the entirely unclear
>> relationship of that build output to the tox venv.  As a first step of
>> clarity, we could stop invoking nose using `python setup.py nosetests`, and
>> instead use `nosetests` (and in the future `pytest`).  I think the reason
>> for `python setup.py nosetest` is to ensure the test requirements are
>> installed,
>>
>
> I believe the reason we are invokign nosetest this way is related to the
> beam testing plugin. It is configure in setup.py. The behavior is
> documented here: https://nose.readthedocs.io/en/latest/api/commands.html
>

It is possible to register a custom plugin without using setup.py:
https://nose.readthedocs.io/en/latest/plugins/writing.html#registering-a-plugin-without-setuptools

Since we're on the verge of switching to pytest, perhaps we should
investigate switching that over to to not use setup.py instead of chasing
our tails with nose.


> but we could shift those to a separate requirements file or use
>> extra_requires and tox can ensure that they're installed.  I find these two
>> options to be pretty common patterns [3].
>>
>
> We do use extras is tox already. GCP tests work this way by installing
> additional GCP package. In my opinion, letting tox to setup the virtual
> environment either from the package or from setup.py is a better option
> than using requirements file. Otherwise we would need a way to keep
> setup.py and requirements file in sync.
>

Oh yeah, I see that the tests already are an extra package.  Well, that'll
make it that much easier to stop using `python setup.py nosetests`.

-chad


Re: Cython unit test suites running without Cythonized sources

2019-11-07 Thread Ahmet Altay
On Thu, Nov 7, 2019 at 1:37 PM Chad Dombrova  wrote:

>
> On Thu, Nov 7, 2019 at 11:31 AM Robert Bradshaw 
> wrote:
>
>> Does python setup.py nosetests invoke build_ext (or, more generally,
>> build)?
>
>
> It's unclear from the nose source[1] whether it's calling build_py
> and build_ext, or just build_ext.  It's also unclear whether the result of
> that build is actually used.  When python setup.py nosetests runs, it runs
> inside of a virtualenv created by tox, and tox has already installed beam
> into that venv.  It seems unlikely to me that build_ext or build_py is
> going to install over top of the beam package installed by tox, but who
> knows, it may end up first on the path.  Also recall that there is an sdist
> gradle task running before tox that creates a tarball that is passed to
> run_tox.sh which passes it along to tox --installpkg flag[2] so that tox
> won't build beam itself.
>

I believe the build step is executed as expected and during installation
results in cythonized package to be installed. This could be verified by,
in a new virtual environment creating a source distribution, installing
cython, then installing the source distribution. Resulting installation
does have the .so files. This is done before running nosetests.


> We should designate a single place that always does the build.  I thought
> that was supposed to be the gradle sdist task, but invoking nose via
> `python setup.py` means that we're introducing the possibility that a build
> is occurring which may or may not be used, depending on the entirely
> unclear dependencies of the setup commands, and the entirely unclear
> relationship of that build output to the tox venv.  As a first step of
> clarity, we could stop invoking nose using `python setup.py nosetests`, and
> instead use `nosetests` (and in the future `pytest`).  I think the reason
> for `python setup.py nosetest` is to ensure the test requirements are
> installed,
>

I believe the reason we are invokign nosetest this way is related to the
beam testing plugin. It is configure in setup.py. The behavior is
documented here: https://nose.readthedocs.io/en/latest/api/commands.html


> but we could shift those to a separate requirements file or use
> extra_requires and tox can ensure that they're installed.  I find these two
> options to be pretty common patterns [3].
>

We do use extras is tox already. GCP tests work this way by installing
additional GCP package. In my opinion, letting tox to setup the virtual
environment either from the package or from setup.py is a better option
than using requirements file. Otherwise we would need a way to keep
setup.py and requirements file in sync. This is also doable.


>
> [1] https://github.com/nose-devs/nose/blob/master/nose/commands.py#L113
> [2]
> https://tox.readthedocs.io/en/latest/config.html#cmdoption-tox-installpkg
> [3]
> https://stackoverflow.com/questions/29870629/pip-install-test-dependencies-for-tox-from-setup-py
>
>
>
>> It's possible cython is present, but the build step is not
>> invoked which would explain the skip for slow_coders_test. The correct
>> test is being used in
>>
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/fast_coders_test.py#L34
>>
>> On Thu, Nov 7, 2019 at 11:20 AM Ahmet Altay  wrote:
>> >
>> > I believe tox is correctly installing cython and executes "python
>> setup.py nosetests" which triggers cythonzation path inside setup.py. Some
>> indications that cython is installed and used is the following log entries
>> (from a recent precommit cron job [1])
>> > - [ 1/12] Cythonizing apache_beam/coders/coder_impl.py
>> > - Errors with cython.cast in the stack traces.
>> > - Tests skipped with: test_using_slow_impl
>> (apache_beam.coders.slow_coders_test.SlowCoders) ... SKIP: Found cython,
>> cannot test non-compiled implementation.
>> >
>> > At the same time there are log entries as following:
>> > - test_using_fast_impl (apache_beam.coders.fast_coders_test.FastCoders)
>> ... SKIP: Cython is not installed
>> >
>> > It might be an issue with what these tests are suing to check whether
>> they are cythonized or not. We seem to have at least 2 different versions
>> of this check [2][3]. Maybe we need to converge on one (former?).
>> >
>> > Ahmet
>> >
>> > [1]
>> https://builds.apache.org/view/A-D/view/Beam/view/All/job/beam_PreCommit_Python_Cron/2008/consoleFull
>> > [2]
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/slow_coders_test.py#L32
>> > [3]
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/tools/utils.py#L33
>> >
>> >
>> >
>> > On Wed, Nov 6, 2019 at 6:19 PM Chad Dombrova  wrote:
>> >>
>> >> Another potential solution would be to _not_ use the sdist task to
>> build the tarball and let tox do it.  Tox should install cython on
>> supported platforms before running sdist itself (which it does by default
>> unless you explicitly provide it with a tarball, which we are doing).  This
>> has the added benefit of 

Re: Feature addition to java CassandraIO connector

2019-11-07 Thread Vincent Marquez
Thanks for the response Pablo.  We're currently using our own custom ParDo
connector for Cassandra (specialized to Scylla's sharding algorithm) that
has a 'readAll' type option and getting great results.   Would you be up
for taking an outside contribution that refactors the current CassandraIO
connector to be of the  PTransform/ParDo kind? I'm happy to give it a shot
in the next week or so and send a PR on github.  My username is vmarquez on
both ASF and gh, I'm also fine with writing up a JIRA describing how I'd
want the more flexible connector to look.


--Vincent

On Wed, Oct 16, 2019 at 11:20 AM Pablo Estrada  wrote:

> Hi Vincent,
> I think it makes sense to have some sort of `readAll` for CassandraIO that
> can receive multiple queries, and execute each one of them. This would also
> be consistent with other IOs that we have such as FileIOs.
> I suspect that doing this may require rearchitecting the whole IO from a
> BoundedSource-based one to a ParDo-based one - so a large change; and we'd
> need to make sure that we don't lose scalability due to that change.
>
> Adding Ismael/JB/Etienne who've done a lot of the work on CassandraIO.
> Thoughts?
> -P.
>
>
> On Mon, Oct 14, 2019 at 3:32 PM Vincent Marquez 
> wrote:
>
>> Hello Pablo, thank you for the response, and apologies for the delay.  I
>> had some work and also wanted to prove out what I was proposing with our
>> own code at my workplace.
>>
>> Here is a small gist of what I'm proposing.
>>
>> https://gist.github.com/vmarquez/204b8f44b1279fdbae97b40f8681bc25
>>
>> I'm happy to explain more or even write up an official design doc if you
>> think that would be helpful explaining things.
>>
>> --Vincent
>>
>> On 2019/10/04 18:03:23, Pablo Estrada  wrote:
>> > Hi Vincent!>
>> > Do you think you could add some code snippets / pseudocode as to what
>> this>
>> > looks like? Feel free to do it on email, gist, google doc, etc?>
>> > Best>
>> > -P.>
>> >
>> > On Thu, Oct 3, 2019 at 4:16 PM Vincent Marquez >
>> > wrote:>
>> >
>> > > Currently the CassandraIO connector allows a user to specify a table,
>> and>
>> > > the CassandraSource object generates a list of queries based on
>> token>
>> > > ranges of the table, along with grouping them by the token ranges.>
>> > >>
>> > > I often need to run (generated, sometimes a million+) queries against
>> a>
>> > > subset of a table.  Instead of providing a filter, it is easier and
>> much>
>> > > more performant to supply a collection of queries along with their
>> tokens>
>> > > to both partition and group by, instead of letting CassandraIO
>> naively run>
>> > > over the entire table or with a simple filter.>
>> > >>
>> > > I propose in addition to the current method of supplying a table and>
>> > > filter, also allowing the user to pass in a collection of queries
>> and>
>> > > tokens.   The current way CassandraSource breaks up the table could
>> be>
>> > > modified to build on top of the proposed implementation to reduce
>> code>
>> > > duplication as well.  If this sounds like an acceptable alternative
>> way of>
>> > > using the CassandraIO connector, I don't mind giving it a shot with a
>> pull>
>> > > request.>
>> > >>
>> > > If there is a better way of doing this, I'm eager to hear and learn.>
>> > > Thanks for reading!>
>> > >>
>> >
>
>

-- 
*-Vincent*


Re: Contributor permission for Beam Jira tickets

2019-11-07 Thread Changming Ma
Oh, one more thing: my jira account name is: cmma



On Thu, Nov 7, 2019 at 3:04 PM Changming Ma  wrote:

> Hi,
> This is Changming, a SWE with Google. I'm working on a GCP DataFlow item
> and it'll be nice some of my changes can be backported to beam (e.g.,
> BEAM-8579).
> Could someone please add me as a contributor for Beam's Jira issue
> tracker? My github account is cmm08 (email: c...@google.com).
>
> Thank you,
> Changming
>


Re: New Contributor

2019-11-07 Thread Kyle Weaver
Can you please share your Jira username?

On Thu, Nov 7, 2019 at 3:04 PM Andrew Crites 
wrote:

> This is Andrew Crites. I'm making some changes to the Python Dataflow
> runner. Can someone add me as a contributor for Beam's Jira issue tracker?
> Apparently I can't be assigned issues right now.
>
> Thanks!
>


Contributor permission for Beam Jira tickets

2019-11-07 Thread Changming Ma
Hi,
This is Changming, a SWE with Google. I'm working on a GCP DataFlow item
and it'll be nice some of my changes can be backported to beam (e.g.,
BEAM-8579).
Could someone please add me as a contributor for Beam's Jira issue tracker?
My github account is cmm08 (email: c...@google.com).

Thank you,
Changming


New Contributor

2019-11-07 Thread Andrew Crites
This is Andrew Crites. I'm making some changes to the Python Dataflow
runner. Can someone add me as a contributor for Beam's Jira issue tracker?
Apparently I can't be assigned issues right now.

Thanks!


Re: Confusing multiple output semantics in Python

2019-11-07 Thread Ning Kang
Hi Sam,

Thanks for clarifying the accessor to output when building a pipeline.

Internally, we have AppliedPTransform, where the output is always a
dictionary:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pipeline.py#L770
And it seems to me that with key 'None', the output will be the main output.

Ning.

On Thu, Nov 7, 2019 at 1:39 PM Sam Rohde  wrote:

> Hi All,
>
> In the Python SDK there are three ways of representing the output of a
> PTransform with multiple PCollections:
>
>- dictionary: PCollection tag --> PCollection
>- tuple: index --> PCollection
>- DoOutputsTuple: tag, index, or field name --> PCollection
>
> I find this inconsistent way of accessing multiple outputs to be
> confusing. Say that you have an arbitrary PTransform with multiple outputs.
> How do you know how to access an individual output without looking at the
> source code? *You can't!* Remember there are three representations of
> multiple outputs. So, you need to look at the output type and determine
> what the output actually is.
>
> What purpose does it serve to have three different ways of representing a
> single concept of multiple output PCollections?
>
> My proposal is to have a single representation analogous to Java's
> PCollectionTuple. With this new type you will able to access PCollections
> by tag with the "[ ]" operator or by field name. It should also up-convert
> returned tuples, dicts, and DoOutputsTuples from composites into this new
> type.
>
> Full example:
>
> class SomeCustomComposite(PTransform):
>   def expand(self, pcoll):
> def my_multi_do_fn(x):
>   if isinstance(x, int):
> yield pvalue.TaggedOutput('number', x)
>   if isinstance(x, str):
> yield pvalue.TaggedOutput('string', x)
>
> def printer(x):
>   print(x)
>   yield x
>
> outputs = pcoll | beam.ParDo(my_multi_do_fn).with_outputs()*return 
> pvalue.PTuple({
> 'number': output.number | beam.ParDo(printer),
> 'string': output.string | beam.ParDo(printer)
> })*
>
> p = beam.Pipeline()
> *main = p | SomeCustomComposite()*
>
> # Access PCollection by field name.
> numbers = *main.number* | beam.ParDo(...)
>
> # Access PCollection by tag.
> strings = *main['string']* | beam.ParDo(...)
>
> What do you think? Does this clear up the confusion of using multiple
> output PCollections in Python?
>
> Regards,
> Sam
>


Re: Deprecate some or all of TestPipelineOptions?

2019-11-07 Thread Ismaël Mejía
Thanks for bringing this to the ML Brian

+1 For full TestPipelineOptions deprecation. Even worth to remove it,
bad part is that this class resides in 'sdks/core/main/java' and not
in testing as I imagined so this could count as a 'breaking' change.

On Thu, Nov 7, 2019 at 8:27 PM Luke Cwik  wrote:
>
> There was issue with asynchrony of p.run(), some runners blocked till the 
> pipeline was complete with p.run() which was never meant to be the intent.
>
> The test timeout one makes sense to be able to configure it per runner (since 
> Dataflow takes a lot longer than other runners) but we may be able to 
> configure a Junit test timeout attribute instead.
>
> I would be for getting rid of them.
>
>
> On Wed, Nov 6, 2019 at 3:36 PM Robert Bradshaw  wrote:
>>
>> +1 to all of these are probably obsolete at this point and would be
>> nice to remove.
>>
>>
>> On Wed, Nov 6, 2019 at 3:00 PM Kenneth Knowles  wrote:
>> >
>> > Good find. I think TestPipelineOptions is from very early days. It makes 
>> > sense to me that these are all obsolete. Some guesses, though I haven't 
>> > dug through commit history to confirm:
>> >
>> >  - TempRoot: a while ago TempLocation was optional, so I think this would 
>> > provide a default for things like gcpTempLocation and stagingLocation
>> >  - OnSuccessMatcher: for runners where pipeline used to not terminate in 
>> > streaming mode. Now I think every runner can successfully waitUntilFinish. 
>> > Also the current API for waitUntilFinish went through some evolutions 
>> > around asynchrony so it wasn't always a good choice.
>> >  - OnCreateMatcher: just for symmetry? I don't know
>> >  - TestTimeoutSeconds: probably also for the asychrony/waitUntilfinish 
>> > issue
>> >
>> > Kenn
>> >
>> > On Wed, Nov 6, 2019 at 12:19 PM Brian Hulette  wrote:
>> >>
>> >> I recently came across TestPipelineOptions, and now I'm wondering if 
>> >> maybe it should be deprecated. It only seems to actually be supported for 
>> >> Spark and Dataflow (via TestSparkRunner and TestDataflowRunner), and I 
>> >> think it may make more sense to move the functionality it provides into 
>> >> the tests that need it.
>> >>
>> >> TestPipelineOptions currently has four attributes:
>> >>
>> >> # TempRoot
>> >> It's purpose isn't documented, but many tests read TempRoot and use it to 
>> >> set a TempLocation (example). I think this attribute makes sense (e.g. we 
>> >> can set TempRoot once and each test has its own subdirectory), but I'm 
>> >> not sure. Can anyone confirm the motivation for it? I'd like to at least 
>> >> add a docstring for it.
>> >>
>> >> # OnCreateMatcher
>> >> A way to register a matcher that will be checked right after a pipeline 
>> >> has started. It's never set except for in TestDataflowRunnerTest, so I 
>> >> think this is absolutely safe to remove.
>> >>
>> >> # OnSuccessMatcher
>> >> A way to register a matcher that will be checked right after a pipeline 
>> >> has successfully completed. This is used in several tests 
>> >> (RequiresStableInputIT, WordCountIT, ... 8 total occurrences), but I 
>> >> don't see why they couldn't all be replaced with a 
>> >> `p.run().waitUntilFinish()`, followed by an assert.
>> >>
>> >> I think the current approach is actually dangerous, because running these 
>> >> tests with runners other than TestDataflowRunner or TestSparkRunner means 
>> >> the matchers are never actually checked. This is actually how I came 
>> >> across TestPipelineOptions - I tried running a test with the DirectRunner 
>> >> and couldn't make it fail.
>> >>
>> >> # TestTimeoutSeconds
>> >> Seems to just be a wrapper for `waitUntilFinish(duration)`, and only used 
>> >> in one place. I think it would be cleaner for the test to be responsible 
>> >> for calling waitUntilFinish (which we do elsewhere), the only drawback is 
>> >> it requires a small refactor so the test has access to the PipelineResult 
>> >> object.
>> >>
>> >>
>> >> So I have a couple of questions for the community
>> >> 1) Are there thoughts on TempRoot? Can we get rid of it?
>> >> 2) Are there any objections to removing the other three attributes? Am I 
>> >> missing something? Unless there are any objections I think I'll write a 
>> >> patch to remove them.
>> >>
>> >> Thanks,
>> >> Brian


Re: Triggers still finish and drop all data

2019-11-07 Thread Steve Niemitz
Interestingly enough, we just had a use case come up that I think could
have been solved by finishing triggers.

Basically, we want to emit a notification when a certain threshold is
reached (in this case, we saw at least N elements for a given key), and
then never notify again within that window.  As mentioned, we can
accomplish this using a stateful DoFn as mentioned above, but I thought it
was interesting that this just came up, and wanted to share.

Maybe it'd be worth building something to simulate this into the SDK?

On Mon, Nov 4, 2019 at 8:15 PM Kenneth Knowles  wrote:

> By the way, adding this guard uncovered two bugs in Beam's Java codebase,
> luckily only benchmarks and tests. There were *no* non-buggy instances of a
> finishing trigger. They both declare allowed lateness that is never used.
>
> Nexmark query 10:
>
> // Clear fancy triggering from above.
> .apply(
> Window.>into(...)
> .triggering(AfterWatermark.pastEndOfWindow())
> // We expect no late data here, but we'll assume the worst
> so we can detect any.
> .withAllowedLateness(Duration.standardDays(1))
> .discardingFiredPanes())
>
> This is nonsensical: the trigger will fire once and close, never firing
> again. So the allowed lateness has no effect except to change counters from
> "dropped due to lateness" to "dropped due to trigger closing". The intent
> would appear to be to restore the default triggering, but it failed.
>
> PipelineTranslationTest:
>
>Window.into(FixedWindows.of(Duration.standardMinutes(7)))
> .triggering(
> AfterWatermark.pastEndOfWindow()
>
> .withEarlyFirings(AfterPane.elementCountAtLeast(19)))
> .accumulatingFiredPanes()
> .withAllowedLateness(Duration.standardMinutes(3L)));
>
> Again, the allowed lateness has no effect. This test is just to test
> portable proto round-trip. But still it is odd to write a nonsensical
> pipeline for this.
>
> Takeaway: experienced Beam developers never use this pattern, but they
> still get it wrong and create pipelines that would have data loss bugs
> because of it.
>
> Since there is no other discussion here, I will trust the community is OK
> with this change and follow Jan's review of my implementation of his idea.
>
> Kenn
>
>
> On Thu, Oct 31, 2019 at 4:06 PM Kenneth Knowles  wrote:
>
>> Opened https://github.com/apache/beam/pull/9960 for this idea. This will
>> alert users to broken pipelines and force them to alter them.
>>
>> Kenn
>>
>> On Thu, Oct 31, 2019 at 2:12 PM Kenneth Knowles  wrote:
>>
>>> On Thu, Oct 31, 2019 at 2:11 AM Jan Lukavský  wrote:
>>>
 Hi Kenn,

 does there still remain some use for trigger to finish? If we don't
 drop
 data, would it still be of any use to users? If not, would it be better
 to just remove the functionality completely, so that users who use it
 (and it will possibly break for them) are aware of it at compile time?

 Jan

>>>
>>> Good point. I believe there is no good use for a top-level trigger
>>> finishing. As mentioned, the intended uses aren't really met by triggers,
>>> but are met by stateful DoFn.
>>>
>>> Eugene's bug even has this title :-). We could not change any behavior
>>> but just reject pipelines with broken top-level triggers. This is probably
>>> a better solution. Because if a user has a broken trigger, the new behavior
>>> is probably not enough to magically fix their pipeline. They are better off
>>> knowing that they are broken and fixing it.
>>>
>>> And at that point, there is a lot of dead code and my PR is really just
>>> cleaning it up as a simplification.
>>>
>>> Kenn
>>>
>>>
>>>
 On 10/30/19 11:26 PM, Kenneth Knowles wrote:
 > Problem: a trigger can "finish" which causes a window to "close" and
 > drop all remaining data arriving for that window.
 >
 > This has been discussed many times and I thought fixed, but it seems
 > to not be fixed. It does not seem to have its own Jira or thread that
 > I can find. But here are some pointers:
 >
 >  - data loss bug:
 >
 https://lists.apache.org/thread.html/ce413231d0b7d52019668765186ef27a7ffb69b151fdb34f4bf80b0f@%3Cdev.beam.apache.org%3E
 >  - user hitting the bug:
 >
 https://lists.apache.org/thread.html/28879bc80cd5c7ef1a3e38cb1d2c063165d40c13c02894bbccd66aca@%3Cuser.beam.apache.org%3E
 >  - user confusion:
 >
 https://lists.apache.org/thread.html/2707aa449c8c6de1c6e3e8229db396323122304c14931c44d0081449@%3Cuser.beam.apache.org%3E
 >  - thread from 2016 on the topic:
 >
 https://lists.apache.org/thread.html/5f44b62fdaf34094ccff8da2a626b7cd344d29a8a0fff6eac8e148ea@%3Cdev.beam.apache.org%3E
 >
 > In theory, trigger finishing was intended for users who can get their
 > answers from a smaller amount of data and then drop the rest. In
 > practice, triggers aren't 

Re: [Discuss] Beam Summit 2020 Dates & locations

2019-11-07 Thread Alex Van Boxel
For date wise, I'm wondering why we should switching the Europe and NA one,
this would mean that the Berlin and the new EU summit would be almost 1.5
years apart.

 _/
_/ Alex Van Boxel


On Thu, Nov 7, 2019 at 8:43 PM Ahmet Altay  wrote:

> I prefer bay are for NA summit. My reasoning is that there is a criticall
> mass of contributors and users in that location, probably more than
> alternative NA locations. I was not involved with planning recently and I
> do not know if there were people who could attend due to location
> previously. If that is the case, I agree with Elliotte on looking for other
> options.
>
> Related to dates: March (Asia) and mid-May (NA) dates are a bit close.
> Mid-June for NA might be better to spread events. Other pieces looks good.
>
> Ahmet
>
> On Thu, Nov 7, 2019 at 7:09 AM Elliotte Rusty Harold 
> wrote:
>
>> The U.S. sadly is not a reliable destination for international
>> conferences these days. Almost every conference I go to, big and
>> small, has at least one speaker, sometimes more, who can't get into
>> the country. Canada seems worth considering. Vancouver, Montreal, and
>> Toronto are all convenient.
>>
>> On Wed, Nov 6, 2019 at 2:17 PM Griselda Cuevas  wrote:
>> >
>> > Hi Beam Community!
>> >
>> > I'd like to kick off a thread to discuss potential dates and venues for
>> the 2020 Beam Summits.
>> >
>> > I did some research on industry conferences happening in 2020 and
>> pre-selected a few ranges as follows:
>> >
>> > (2 days) NA between mid-May and mid-June
>> > (2 days) EU mid October
>> > (1 day) Asia Mini Summit:  March
>> >
>> > I'd like to hear your thoughts on these dates and get consensus on
>> exact dates as the convo progresses.
>> >
>> > For locations these are the options I reviewed:
>> >
>> > NA: Austin Texas, Berkeley California, Mexico City.
>> > Europe: Warsaw, Barcelona, Paris
>> > Asia: Singapore
>> >
>> > Let the discussion begin!
>> > G (on behalf of the Beam Summit Steering Committee)
>> >
>> >
>> >
>>
>>
>> --
>> Elliotte Rusty Harold
>> elh...@ibiblio.org
>>
>


Confusing multiple output semantics in Python

2019-11-07 Thread Sam Rohde
Hi All,

In the Python SDK there are three ways of representing the output of a
PTransform with multiple PCollections:

   - dictionary: PCollection tag --> PCollection
   - tuple: index --> PCollection
   - DoOutputsTuple: tag, index, or field name --> PCollection

I find this inconsistent way of accessing multiple outputs to be confusing.
Say that you have an arbitrary PTransform with multiple outputs. How do you
know how to access an individual output without looking at the source
code? *You
can't!* Remember there are three representations of multiple outputs. So,
you need to look at the output type and determine what the output actually
is.

What purpose does it serve to have three different ways of representing a
single concept of multiple output PCollections?

My proposal is to have a single representation analogous to Java's
PCollectionTuple. With this new type you will able to access PCollections
by tag with the "[ ]" operator or by field name. It should also up-convert
returned tuples, dicts, and DoOutputsTuples from composites into this new
type.

Full example:

class SomeCustomComposite(PTransform):
  def expand(self, pcoll):
def my_multi_do_fn(x):
  if isinstance(x, int):
yield pvalue.TaggedOutput('number', x)
  if isinstance(x, str):
yield pvalue.TaggedOutput('string', x)

def printer(x):
  print(x)
  yield x

outputs = pcoll | beam.ParDo(my_multi_do_fn).with_outputs()*
return pvalue.PTuple({
'number': output.number | beam.ParDo(printer),
'string': output.string | beam.ParDo(printer)
})*

p = beam.Pipeline()
*main = p | SomeCustomComposite()*

# Access PCollection by field name.
numbers = *main.number* | beam.ParDo(...)

# Access PCollection by tag.
strings = *main['string']* | beam.ParDo(...)

What do you think? Does this clear up the confusion of using multiple
output PCollections in Python?

Regards,
Sam


Re: Cython unit test suites running without Cythonized sources

2019-11-07 Thread Chad Dombrova
On Thu, Nov 7, 2019 at 11:31 AM Robert Bradshaw  wrote:

> Does python setup.py nosetests invoke build_ext (or, more generally,
> build)?


It's unclear from the nose source[1] whether it's calling build_py
and build_ext, or just build_ext.  It's also unclear whether the result of
that build is actually used.  When python setup.py nosetests runs, it runs
inside of a virtualenv created by tox, and tox has already installed beam
into that venv.  It seems unlikely to me that build_ext or build_py is
going to install over top of the beam package installed by tox, but who
knows, it may end up first on the path.  Also recall that there is an sdist
gradle task running before tox that creates a tarball that is passed to
run_tox.sh which passes it along to tox --installpkg flag[2] so that tox
won't build beam itself.

We should designate a single place that always does the build.  I thought
that was supposed to be the gradle sdist task, but invoking nose via
`python setup.py` means that we're introducing the possibility that a build
is occurring which may or may not be used, depending on the entirely
unclear dependencies of the setup commands, and the entirely unclear
relationship of that build output to the tox venv.  As a first step of
clarity, we could stop invoking nose using `python setup.py nosetests`, and
instead use `nosetests` (and in the future `pytest`).  I think the reason
for `python setup.py nosetest` is to ensure the test requirements are
installed, but we could shift those to a separate requirements file or use
extra_requires and tox can ensure that they're installed.  I find these two
options to be pretty common patterns [3].

[1] https://github.com/nose-devs/nose/blob/master/nose/commands.py#L113
[2]
https://tox.readthedocs.io/en/latest/config.html#cmdoption-tox-installpkg
[3]
https://stackoverflow.com/questions/29870629/pip-install-test-dependencies-for-tox-from-setup-py



> It's possible cython is present, but the build step is not
> invoked which would explain the skip for slow_coders_test. The correct
> test is being used in
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/fast_coders_test.py#L34
>
> On Thu, Nov 7, 2019 at 11:20 AM Ahmet Altay  wrote:
> >
> > I believe tox is correctly installing cython and executes "python
> setup.py nosetests" which triggers cythonzation path inside setup.py. Some
> indications that cython is installed and used is the following log entries
> (from a recent precommit cron job [1])
> > - [ 1/12] Cythonizing apache_beam/coders/coder_impl.py
> > - Errors with cython.cast in the stack traces.
> > - Tests skipped with: test_using_slow_impl
> (apache_beam.coders.slow_coders_test.SlowCoders) ... SKIP: Found cython,
> cannot test non-compiled implementation.
> >
> > At the same time there are log entries as following:
> > - test_using_fast_impl (apache_beam.coders.fast_coders_test.FastCoders)
> ... SKIP: Cython is not installed
> >
> > It might be an issue with what these tests are suing to check whether
> they are cythonized or not. We seem to have at least 2 different versions
> of this check [2][3]. Maybe we need to converge on one (former?).
> >
> > Ahmet
> >
> > [1]
> https://builds.apache.org/view/A-D/view/Beam/view/All/job/beam_PreCommit_Python_Cron/2008/consoleFull
> > [2]
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/slow_coders_test.py#L32
> > [3]
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/tools/utils.py#L33
> >
> >
> >
> > On Wed, Nov 6, 2019 at 6:19 PM Chad Dombrova  wrote:
> >>
> >> Another potential solution would be to _not_ use the sdist task to
> build the tarball and let tox do it.  Tox should install cython on
> supported platforms before running sdist itself (which it does by default
> unless you explicitly provide it with a tarball, which we are doing).  This
> has the added benefit of one less virtualenv.  Right now we create a
> virtualenv to build the sdist tarball, then we create another virtualenv to
> run tox, then tox creates a virtualenv to run the task.   It's unclear (to
> me) whether the tarball is rebuilt for each tox task or if it's reused.
> >
> >
> > I do not know if not passing the tarball will solve the issue. I tried
> this and ran into the same problem.
> >
> > I agree that we can get rid of setup virtualenv task if it is not adding
> value.
> >
> >>
> >>
> >> -chad
> >>
> >>
> >> On Wed, Nov 6, 2019 at 6:13 PM Udi Meiri  wrote:
> >>>
> >>> I opened this bug today after commenting on Chad's type hints PR.
> >>> https://issues.apache.org/jira/browse/BEAM-8572?filter=-1
> >
> >
> > Thank you for filing an issue.
> >
> >>>
> >>>
> >>>
> >>> I am 95% sure that our Precommit tests are using tarballs that are
> built without Cython (including the Cython tasks).
> >>>
> >>> I'm NOT currently working on fixing this. One solution might be to add
> an additional task (sdistCython) and tell gradle that sdist and the new
> task should not run 

Re: ES 7.0 Support Development

2019-11-07 Thread David Morávek
Hi Zhong,

just fyi, there is another ongoing effort on adding es 7 support.

https://github.com/apache/beam/pull/10025

you guys should get in touch ;)

D.

Sent from my iPhone

> On 7 Nov 2019, at 20:20, Zhong Chen  wrote:
> 
> 
> Hi all,
> 
> I have made a PR for adding ES 7.0 support here. However the unit tests are 
> failing because for some reason the test cluster is not publishing http 
> endpoints correctly, which is leading to connection refused exception. I am 
> still trying to figure that out. Any help would be much appreciated!
> 
> In addition, can someone take a look at my existing PR and provide some 
> feedback?
> 
> https://issues.apache.org/jira/browse/BEAM-5192
> 
> 
> 
> Zhong Chen
> Big Data and Analytics Cloud Consultant
> 650-502-0142
> zhongc...@google.com
> 
> 
> 
> 
> 
> 


RabbitMQ and CheckpointMark feasibility

2019-11-07 Thread Daniel Robert
(Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library. 
As part of this I switched to a pull-based API rather than the 
previously-used push-based. This has caused some nebulous problems so 
put up a correction PR that I think needs some eyes fairly quickly as 
I'd consider master to be broken for rabbitmq right now. The PR keeps 
the upgrade but reverts to the same push-based implementation as in 4.x: 
https://github.com/apache/beam/pull/9977 )


Regardless, in trying to get the pull-based API to work, I'm finding the 
interactions between rabbitmq and beam with CheckpointMark to be 
fundamentally impossible to implement so I'm hoping for some input here.


CheckointMark itself must be Serializable, presumably this means it gets 
shuffled around between nodes. However 'Channel', the tunnel through 
which it communicates with Rabbit to ack messages and finalize the 
checkpoint, is non-Serializable. Like most other CheckpointMark 
implementations, Channel is 'transient'. When a new CheckpointMark is 
instantiated, it's given a Channel. If an existing one is supplied to 
the Reader's constructor (part of the 'startReader()' interface), the 
channel is overwritten.


*However*, Rabbit does not support 'ack'ing messages on a channel other 
than the one that consumed them in the first place. Attempting to do so 
results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See 
https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed 
).


Truthfully, I don't really understand how the current implementation is 
working; it seems like a happy accident. But I'm curious if someone 
could help me debug and implement how to bridge the 
re-usable/serializable CheckpointMark requirement in Beam with this 
limitation of Rabbit.


Thanks,
-Daniel Robert



Re: [Discuss] Beam Summit 2020 Dates & locations

2019-11-07 Thread Ahmet Altay
I prefer bay are for NA summit. My reasoning is that there is a criticall
mass of contributors and users in that location, probably more than
alternative NA locations. I was not involved with planning recently and I
do not know if there were people who could attend due to location
previously. If that is the case, I agree with Elliotte on looking for other
options.

Related to dates: March (Asia) and mid-May (NA) dates are a bit close.
Mid-June for NA might be better to spread events. Other pieces looks good.

Ahmet

On Thu, Nov 7, 2019 at 7:09 AM Elliotte Rusty Harold 
wrote:

> The U.S. sadly is not a reliable destination for international
> conferences these days. Almost every conference I go to, big and
> small, has at least one speaker, sometimes more, who can't get into
> the country. Canada seems worth considering. Vancouver, Montreal, and
> Toronto are all convenient.
>
> On Wed, Nov 6, 2019 at 2:17 PM Griselda Cuevas  wrote:
> >
> > Hi Beam Community!
> >
> > I'd like to kick off a thread to discuss potential dates and venues for
> the 2020 Beam Summits.
> >
> > I did some research on industry conferences happening in 2020 and
> pre-selected a few ranges as follows:
> >
> > (2 days) NA between mid-May and mid-June
> > (2 days) EU mid October
> > (1 day) Asia Mini Summit:  March
> >
> > I'd like to hear your thoughts on these dates and get consensus on exact
> dates as the convo progresses.
> >
> > For locations these are the options I reviewed:
> >
> > NA: Austin Texas, Berkeley California, Mexico City.
> > Europe: Warsaw, Barcelona, Paris
> > Asia: Singapore
> >
> > Let the discussion begin!
> > G (on behalf of the Beam Summit Steering Committee)
> >
> >
> >
>
>
> --
> Elliotte Rusty Harold
> elh...@ibiblio.org
>


Re: Key encodings for state requests

2019-11-07 Thread Maximilian Michels

While the Go SDK doesn't yet support a State API, Option 3) is what the Go SDK 
does for all non-standard coders (aka custom coders) anyway.


For wire transfer, the Java Runner also adds a LengthPrefixCoder for the 
coder and its subcomponents. The problem is that this is an implicit 
assumption made. In the Proto, we do not have this represented. This is 
why **for state requests**, we end up with a 
"LengthPrefixCoder[CustomCoder]" on the Runner and a "CustomCoder" on 
the SDK Harness side. Note that the Python Harness does wrap unknown 
coders in a LengthPrefixCoder for transferring regular elements, but the 
LengthPrefixCoder is not preserved for the state requests.


In that sense (3) is good because it follows this implicit notion of 
adding a LengthPrefixCoder for wire transfer, but applies it to state 
requests.


However, option (1) is most reliable because the LengthPrefixCoder is 
actually in the Proto. So "CustomCoder" will always be represented as 
"LengthPrefixCoder[CustomCoder]", and only standard coders will be added 
without a LengthPrefixCoder.



I'd really like to avoid implicit agreements about how the coder that
should be used differs from what's specified in the proto in different
contexts.


Option (2) would work on top of the existing logic because replacing a 
non-standard coder with a "NOOP coder" would just be used by the Runner 
to produce a serialized version of the key for partitioning. Flink 
always operates on the serialized key, be it standard or non-standard 
coder. It wouldn't be necessary to change any of the existing wire 
transfer logic or representation. I understand that it would be less 
ideal, but maybe easier to fix for the release.



The key concept here is not "standard coder" but "coder that the
runner does not understand." This knowledge is only in the runner.
Also has the downside of (2).


Yes, I had assumed "non-standard" and "unknown" are the same, but the 
latter can be a subset of the former, i.e. if a Runner does not support 
all of the standard coders for some reason.



This means that the wire format that the runner sends for the "key" represents 
the exact same wire format it will receive for state requests.


The wire format for the entire element is the same. Otherwise we 
wouldn't be able to process data between the Runner and the SDK Harness. 
However, the problem is that the way the Runner instantiates the key 
coder to partition elements, does not match how the SDK encodes the key 
when it sends a state request to the Runner. Conceptually, those two 
situations should be the same, but in practice they are not.



Now that I thought about it again option (1) is probably the most 
explicit and in that sense cleanest. However, option (3) is kind of fair 
because it would just replicate the implicit LengthPrefixCoder behavior 
we have for general wire transfer also for state requests. Option (2) I 
suppose is the most implicit and runner-specific, should probably be 
avoided in the long run.


So I'd probably opt for (1) and I would update the PR[1] rather soon 
because this currently blocks the release, as this is a regression from 
2.16.0.[2]



-Max

[1] https://github.com/apache/beam/pull/9997
[2] (In 2.16.0 it worked for Python because the Runner used a 
ByteArrayCoder with the OUTER encoding context for the key which was 
basically option (2). Only problem that, for standard coders the Java 
SDK Harness produced non-matching state request keys, due to it using 
the NESTED context.)


On 07.11.19 18:01, Luke Cwik wrote:



On Thu, Nov 7, 2019 at 8:22 AM Robert Bradshaw > wrote:


On Thu, Nov 7, 2019 at 6:26 AM Maximilian Michels mailto:m...@apache.org>> wrote:
 >
 > Thanks for the feedback thus far. Some more comments:
 >
 > > Instead, the runner knows ahead of time that it
 > > will need to instantiate this coder, and should update the bundle
 > > processor to specify KvCoder,
 > > VarIntCoder> as the coder so both can pull it out in a
consistent way.
 >
 > By "update the bundle processor", do you mean modifying the
 > ProcessBundleDescriptor's BagUserState with the correct key coder?
 > Conceptually that is possible, but the current implementation
does not
 > allow for this to happen:
 >

https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284
 > It enforces ByteString which does not tell the SDK Harness anything
 > about the desired encoding.

I meant update the BundleProcessDescriptor proto that is sent to the
SDK

https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L140,
essentially option (1).


For clarity, the "key" coder is specified by the stateful ParDo's main 
input PCollection. This means that the ProcessBundleDescriptor should 

Re: Cython unit test suites running without Cythonized sources

2019-11-07 Thread Robert Bradshaw
Does python setup.py nosetests invoke build_ext (or, more generally,
build)? It's possible cython is present, but the build step is not
invoked which would explain the skip for slow_coders_test. The correct
test is being used in
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/fast_coders_test.py#L34

On Thu, Nov 7, 2019 at 11:20 AM Ahmet Altay  wrote:
>
> I believe tox is correctly installing cython and executes "python setup.py 
> nosetests" which triggers cythonzation path inside setup.py. Some indications 
> that cython is installed and used is the following log entries (from a recent 
> precommit cron job [1])
> - [ 1/12] Cythonizing apache_beam/coders/coder_impl.py
> - Errors with cython.cast in the stack traces.
> - Tests skipped with: test_using_slow_impl 
> (apache_beam.coders.slow_coders_test.SlowCoders) ... SKIP: Found cython, 
> cannot test non-compiled implementation.
>
> At the same time there are log entries as following:
> - test_using_fast_impl (apache_beam.coders.fast_coders_test.FastCoders) ... 
> SKIP: Cython is not installed
>
> It might be an issue with what these tests are suing to check whether they 
> are cythonized or not. We seem to have at least 2 different versions of this 
> check [2][3]. Maybe we need to converge on one (former?).
>
> Ahmet
>
> [1] 
> https://builds.apache.org/view/A-D/view/Beam/view/All/job/beam_PreCommit_Python_Cron/2008/consoleFull
> [2] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/slow_coders_test.py#L32
> [3] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/tools/utils.py#L33
>
>
>
> On Wed, Nov 6, 2019 at 6:19 PM Chad Dombrova  wrote:
>>
>> Another potential solution would be to _not_ use the sdist task to build the 
>> tarball and let tox do it.  Tox should install cython on supported platforms 
>> before running sdist itself (which it does by default unless you explicitly 
>> provide it with a tarball, which we are doing).  This has the added benefit 
>> of one less virtualenv.  Right now we create a virtualenv to build the sdist 
>> tarball, then we create another virtualenv to run tox, then tox creates a 
>> virtualenv to run the task.   It's unclear (to me) whether the tarball is 
>> rebuilt for each tox task or if it's reused.
>
>
> I do not know if not passing the tarball will solve the issue. I tried this 
> and ran into the same problem.
>
> I agree that we can get rid of setup virtualenv task if it is not adding 
> value.
>
>>
>>
>> -chad
>>
>>
>> On Wed, Nov 6, 2019 at 6:13 PM Udi Meiri  wrote:
>>>
>>> I opened this bug today after commenting on Chad's type hints PR.
>>> https://issues.apache.org/jira/browse/BEAM-8572?filter=-1
>
>
> Thank you for filing an issue.
>
>>>
>>>
>>>
>>> I am 95% sure that our Precommit tests are using tarballs that are built 
>>> without Cython (including the Cython tasks).
>>>
>>> I'm NOT currently working on fixing this. One solution might be to add an 
>>> additional task (sdistCython) and tell gradle that sdist and the new task 
>>> should not run concurrently.
>>> Does anyone want to take this up?


Re: Permission to contribute on LZO compression enablement for Beam Java SDK

2019-11-07 Thread Luke Cwik
Welcome, I have added you as a contributor and assigned the ticket to you.

On Thu, Nov 7, 2019 at 4:21 AM Amogh Tiwari  wrote:

> Hi,
>
> I would like to contribute on enabling Apache Beam's java SDK to work with
> LZO compression. Please add me as a contributor so that I can work on this.
> I've also raised a ticket
>  for the same.
>
> Thanks and best regards,
> Amogh Tiwari
>


Re: Deprecate some or all of TestPipelineOptions?

2019-11-07 Thread Luke Cwik
There was issue with asynchrony of p.run(), some runners blocked till the
pipeline was complete with p.run() which was never meant to be the intent.

The test timeout one makes sense to be able to configure it per runner
(since Dataflow takes a lot longer than other runners) but we may be able
to configure a Junit test timeout attribute instead.

I would be for getting rid of them.


On Wed, Nov 6, 2019 at 3:36 PM Robert Bradshaw  wrote:

> +1 to all of these are probably obsolete at this point and would be
> nice to remove.
>
>
> On Wed, Nov 6, 2019 at 3:00 PM Kenneth Knowles  wrote:
> >
> > Good find. I think TestPipelineOptions is from very early days. It makes
> sense to me that these are all obsolete. Some guesses, though I haven't dug
> through commit history to confirm:
> >
> >  - TempRoot: a while ago TempLocation was optional, so I think this
> would provide a default for things like gcpTempLocation and stagingLocation
> >  - OnSuccessMatcher: for runners where pipeline used to not terminate in
> streaming mode. Now I think every runner can successfully waitUntilFinish.
> Also the current API for waitUntilFinish went through some evolutions
> around asynchrony so it wasn't always a good choice.
> >  - OnCreateMatcher: just for symmetry? I don't know
> >  - TestTimeoutSeconds: probably also for the asychrony/waitUntilfinish
> issue
> >
> > Kenn
> >
> > On Wed, Nov 6, 2019 at 12:19 PM Brian Hulette 
> wrote:
> >>
> >> I recently came across TestPipelineOptions, and now I'm wondering if
> maybe it should be deprecated. It only seems to actually be supported for
> Spark and Dataflow (via TestSparkRunner and TestDataflowRunner), and I
> think it may make more sense to move the functionality it provides into the
> tests that need it.
> >>
> >> TestPipelineOptions currently has four attributes:
> >>
> >> # TempRoot
> >> It's purpose isn't documented, but many tests read TempRoot and use it
> to set a TempLocation (example). I think this attribute makes sense (e.g.
> we can set TempRoot once and each test has its own subdirectory), but I'm
> not sure. Can anyone confirm the motivation for it? I'd like to at least
> add a docstring for it.
> >>
> >> # OnCreateMatcher
> >> A way to register a matcher that will be checked right after a pipeline
> has started. It's never set except for in TestDataflowRunnerTest, so I
> think this is absolutely safe to remove.
> >>
> >> # OnSuccessMatcher
> >> A way to register a matcher that will be checked right after a pipeline
> has successfully completed. This is used in several tests
> (RequiresStableInputIT, WordCountIT, ... 8 total occurrences), but I don't
> see why they couldn't all be replaced with a `p.run().waitUntilFinish()`,
> followed by an assert.
> >>
> >> I think the current approach is actually dangerous, because running
> these tests with runners other than TestDataflowRunner or TestSparkRunner
> means the matchers are never actually checked. This is actually how I came
> across TestPipelineOptions - I tried running a test with the DirectRunner
> and couldn't make it fail.
> >>
> >> # TestTimeoutSeconds
> >> Seems to just be a wrapper for `waitUntilFinish(duration)`, and only
> used in one place. I think it would be cleaner for the test to be
> responsible for calling waitUntilFinish (which we do elsewhere), the only
> drawback is it requires a small refactor so the test has access to the
> PipelineResult object.
> >>
> >>
> >> So I have a couple of questions for the community
> >> 1) Are there thoughts on TempRoot? Can we get rid of it?
> >> 2) Are there any objections to removing the other three attributes? Am
> I missing something? Unless there are any objections I think I'll write a
> patch to remove them.
> >>
> >> Thanks,
> >> Brian
>


ES 7.0 Support Development

2019-11-07 Thread Zhong Chen
Hi all,

I have made a PR  for adding ES
7.0 support here. However the unit tests are failing because for some
reason the test cluster is not publishing http endpoints correctly, which
is leading to connection refused exception. I am still trying to figure
that out. Any help would be much appreciated!

In addition, can someone take a look at my existing PR and provide some
feedback?

https://issues.apache.org/jira/browse/BEAM-5192




*Zhong Chen*

Big Data and Analytics Cloud Consultant

650-502-0142

zhongc...@google.com


Re: Cython unit test suites running without Cythonized sources

2019-11-07 Thread Ahmet Altay
I believe tox is correctly installing cython and executes "python setup.py
nosetests" which triggers cythonzation path inside setup.py. Some
indications that cython is installed and used is the following log entries
(from a recent precommit cron job [1])
- [ 1/12] Cythonizing apache_beam/coders/coder_impl.py
- Errors with cython.cast in the stack traces.
- Tests skipped with: test_using_slow_impl
(apache_beam.coders.slow_coders_test.SlowCoders) ... SKIP: Found cython,
cannot test non-compiled implementation.

At the same time there are log entries as following:
- test_using_fast_impl (apache_beam.coders.fast_coders_test.FastCoders) ...
SKIP: Cython is not installed

It might be an issue with what these tests are suing to check whether they
are cythonized or not. We seem to have at least 2 different versions of
this check [2][3]. Maybe we need to converge on one (former?).

Ahmet

[1]
https://builds.apache.org/view/A-D/view/Beam/view/All/job/beam_PreCommit_Python_Cron/2008/consoleFull
[2]
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/slow_coders_test.py#L32
[3]
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/tools/utils.py#L33



On Wed, Nov 6, 2019 at 6:19 PM Chad Dombrova  wrote:

> Another potential solution would be to _not_ use the sdist task to build
> the tarball and let tox do it.  Tox should install cython on supported
> platforms before running sdist itself (which it does by default unless you
> explicitly provide it with a tarball, which we are doing).  This has the
> added benefit of one less virtualenv.  Right now we create a virtualenv to
> build the sdist tarball, then we create another virtualenv to run tox, then
> tox creates a virtualenv to run the task.   It's unclear (to me) whether
> the tarball is rebuilt for each tox task or if it's reused.
>

I do not know if not passing the tarball will solve the issue. I tried this
and ran into the same problem.

I agree that we can get rid of setup virtualenv task if it is not adding
value.


>
> -chad
>
>
> On Wed, Nov 6, 2019 at 6:13 PM Udi Meiri  wrote:
>
>> I opened this bug today after commenting
>>  on
>> Chad's type hints PR.
>> https://issues.apache.org/jira/browse/BEAM-8572?filter=-1
>>
>
Thank you for filing an issue.


>
>>
>> I am 95% sure that our Precommit tests are using tarballs that are built
>> without Cython (including the Cython tasks).
>>
>> I'm NOT currently working on fixing this. One solution might be to add an
>> additional task (sdistCython) and tell gradle that sdist and the new task
>> should not run concurrently.
>> Does anyone want to take this up?
>>
>


Re: Contributing to Beam javadoc

2019-11-07 Thread Luke Cwik
Welcome and I just merged your PR.

On Wed, Nov 6, 2019 at 1:15 PM Ismaël Mejía  wrote:

> Done, you can now self assign issues too, welcome Jonathan!
>
> On Wed, Nov 6, 2019 at 10:00 PM Jonathan Alvarez-Gutierrez
>  wrote:
> >
> > Hey,
> >
> > I just filed https://issues.apache.org/jira/browse/BEAM-8573 and wanted
> to create a PR with a fix.
> >
> > I should also check if there's an extant documentation / Splittable DoFn
> project that would pre-empt or subsume my teeny documentation fix.
> >
> > If not, I'd like to assign the issue to jagthebeetle (myself).
> >
> > Best,
> > Jonathan
>


Re: Command for Beam worker on Spark cluster

2019-11-07 Thread Matthew K.

Thanks, but still have problem making remote worker on k8s work (important to point out that I had to create shared volume between nodes in order all have access to the same /tmp, since beam runner creates artifact staging files on the machine it is running on, and expects workers to read from it).

 

However, I get this error from executor:

 


INFO AbstractArtifactRetrievalService: GetManifest for /tmp/beam-artifact-staging/job_cca3e889-76d9-4c8a-a942-a64ddbd2dd1f/MANIFEST
INFO AbstractArtifactRetrievalService: GetManifest for /tmp/beam-artifact-staging/job_cca3e889-76d9-4c8a-a942-a64ddbd2dd1f/MANIFEST -> 0 artifacts
INFO ProcessEnvironmentFactory: Still waiting for startup of environment '/opt/spark/beam/sdks/python/container/build/target/launcher/linux_amd64/boot' for worker id 3-1
ERROR Executor: Exception in task 0.1 in stage 0.0 (TID 2)


org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 1
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)

 


(note that job manifest has no artifacts in it)

 

I can see ports for enpoints (logging, artifact, ...) are open on the worker. Some debugging to boot.go and running it manually shows it doesn't return from "artifact.Materialize" function.

 

Any idea what could be wrong in setup?

 

Sent: Wednesday, November 06, 2019 at 5:45 PM
From: "Kyle Weaver" 
To: dev 
Subject: Re: Command for Beam worker on Spark cluster



> Where can I extract these parameters from?

 

These parameters should be passed automatically when the process is run (note the use of $* in the example script): https://github.com/apache/beam/blob/fbc84b61240a3d83d9c19f7ccc17ba22e5d7e2c9/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java#L115-L121

 


> Also, how spark executor can find the port that grpc server is running on?


Not sure which grpc server you mean here.

 


On Wed, Nov 6, 2019 at 3:32 PM Matthew K.  wrote:





Thanks, still I need to pass parameters to the boot executable, such as, worker id, control endpoint, logging endpoint, etc.

 

Where can I extract these parameters from? (In apache_beam Python code, those can be extracted from StartWorker request parameters)

 

Also, how spark executor can find the port that grpc server is running on?

 

Sent: Wednesday, November 06, 2019 at 5:07 PM
From: "Kyle Weaver" 
To: dev 
Subject: Re: Command for Beam worker on Spark cluster


In Docker mode, most everything's taken care of for you, but in process mode you have to do a lot of setup yourself. The command you're looking for is `sdks/python/container/build/target/launcher/linux_amd64/boot`. You will be required to have both that executable (which you can build from source using `./gradlew :sdks:python:container:build`) and a Python installation including Beam and other dependencies on all of your worker machines.
 
The best example I know of is here: https://github.com/apache/beam/blob/cbf8a900819c52940a0edd90f59bf6aec55c817a/sdks/python/test-suites/portable/py2/build.gradle#L146-L165


 


On Wed, Nov 6, 2019 at 2:24 PM Matthew K.  wrote:




Hi all,

 

I am trying to run *Python* beam pipeline on a Spark cluster. Since workers are running on separate nodes, I am using "PROCESS" for "evironment_type" in pipeline options, but I couldn't find any documentation on what "command" I should pass to "environment_config" to run on the worker, so executor can be able to communicate with.

 

Can someone help me on that?


















Re: Getting contributor permission to JIRA

2019-11-07 Thread Luke Cwik
Welcome, I have added you as a contributor and assigned BEAM-8575 to you.

On Wed, Nov 6, 2019 at 5:37 PM Wenjia Liu  wrote:

> Hi,
>
> This is Wendy from Google. I'm contributing to adding more tests for Beam
> Python. Could anyone add me as a contributor for JIRA? I'd like to assign
> this issue BEAM-8575 to myself.
>
> Thanks,
> Wendy
>


Re: (Question) SQL integration tests for MongoDb

2019-11-07 Thread Kirill Kozlov
Thank you for your response!

I want to make sure that when tests run on Jenkins they get supplied with
pipelines options containing hostName and Port of a running MongoDb service.

I'm writing integration test for a MongoDb SQL adapter (located
sdks/java/extensions/sql/meta/provider/mongodb).
I cannot simply use `enableJavaPerformanceTesting()`, because tests for all
adapters are run via the same build file, which has a custom task
"integrationTest".

I hope this better explains the problem I am trying to tackle.

-
Kirill

On Thu, Nov 7, 2019, 03:36 Michał Walenia 
wrote:

> Hi,
>
> What exactly are you trying to do? If you're looking for a way to provide
> pipeline options to the MongoDBIOIT, you can pass them via command line
> like this:
>
> ./gradlew integrationTest -p sdks/java/io/mongodb
>
>
>
> * -DintegrationTestPipelineOptions='[   "--mongoDBHostName=1.2.3.4",
>  "--mongoDBPort=27017",   "--mongoDBDatabaseName=mypass",
>  "--numberOfRecords=1000" ]'*
>--tests org.apache.beam.sdk.io.mongodb.MongoDbIOIT
>-DintegrationTestRunner=direct
>
> Gradle tasks created with `enableJavaPerformanceTesting()` will allow such
> options to be passed.
>
> If you're trying to do something else, please let me know.
>
> Regards
> Michal
>
> On Thu, Nov 7, 2019 at 1:44 AM Kirill Kozlov 
> wrote:
>
>> Hi everyone!
>>
>> I am trying to test MongoDb Sql Table, but not quite sure how to pass
>> pipeline options with the hostName, port, and databaseName used by Jenkins.
>>
>> It looks like the integration test for MongoDbIO Connector obtain those
>> values from the
>> 'beam/.test-infra/jenkins/job_PerformanceTests_MongoDBIO_IT.groovy' file
>> via calling the following methods in the 'gradle.build' file:
>> provideIntegrationTestingDependencies()
>> enableJavaPerformanceTesting()
>>
>> Sql build file already has a task with the name 'integrationTest' defined
>> and does not let us do `enableJavaPerformanceTesting()`.
>>
>>  I would really appreciate if someone could provide me with a couple of
>> pointers on getting this to work.
>>
>> -
>> Kirill
>>
>
>
> --
>
> Michał Walenia
> Polidea  | Software Engineer
>
> M: +48 791 432 002 <+48791432002>
> E: michal.wale...@polidea.com
>
> Unique Tech
> Check out our projects! 
>


Re: 10,000 Pull Requests

2019-11-07 Thread Maximilian Michels

Yes! Keep the committer pipeline filled ;)

Reviewing PRs probably remains one of the toughest problems in active 
open-source projects.


On 07.11.19 18:28, Luke Cwik wrote:

We need more committers...
that review the code.

On Wed, Nov 6, 2019 at 6:21 PM Pablo Estrada > wrote:


iiipe : )

On Thu, Nov 7, 2019 at 12:59 AM Kenneth Knowles mailto:k...@apache.org>> wrote:

Awesome!

Number of days from PR #1 and PR #1000: 211
Number of days from PR #9000 and PR #1: 71

Kenn

On Wed, Nov 6, 2019 at 6:28 AM Łukasz Gajowy mailto:lgaj...@apache.org>> wrote:

Yay! Nice! :)

śr., 6 lis 2019 o 14:38 Maximilian Michels mailto:m...@apache.org>> napisał(a):

Just wanted to point out, we have crossed the 10,000 PRs
mark :)

...and the winner is:
https://github.com/apache/beam/pull/1

Seriously, I think Beam's culture to promote PRs over
direct access to
the repository is remarkable. To another 10,000 PRs!

Cheers,
Max



Re: 10,000 Pull Requests

2019-11-07 Thread Luke Cwik
We need more committers...
that review the code.

On Wed, Nov 6, 2019 at 6:21 PM Pablo Estrada  wrote:

> iiipe : )
>
> On Thu, Nov 7, 2019 at 12:59 AM Kenneth Knowles  wrote:
>
>> Awesome!
>>
>> Number of days from PR #1 and PR #1000: 211
>> Number of days from PR #9000 and PR #1: 71
>>
>> Kenn
>>
>> On Wed, Nov 6, 2019 at 6:28 AM Łukasz Gajowy  wrote:
>>
>>> Yay! Nice! :)
>>>
>>> śr., 6 lis 2019 o 14:38 Maximilian Michels  napisał(a):
>>>
 Just wanted to point out, we have crossed the 10,000 PRs mark :)

 ...and the winner is: https://github.com/apache/beam/pull/1

 Seriously, I think Beam's culture to promote PRs over direct access to
 the repository is remarkable. To another 10,000 PRs!

 Cheers,
 Max

>>>


Re: [PROPOSAL] Storing, displaying and detecting anomalies in test results

2019-11-07 Thread Kamil Wasilewski
Thanks for spotting this! It should be working fine now.

On Thu, Nov 7, 2019 at 5:40 PM Dan Gazineu  wrote:

> Thank you for the update Kamil! Please fix the sharing options in the new
> doc.
>
> On Thu, Nov 7, 2019 at 7:22 AM Kamil Wasilewski <
> kamil.wasilew...@polidea.com> wrote:
>
>> Hi all,
>>
>> For a while we have been working on the implementation of proposal
>> presented in this thread. Just to remind what it was about in short words —
>> we wanted to use Prometheus and Grafana to visualize Beam performance tests
>> results in addition to detect regressions automatically. A couple of
>> obstacles forced us to change our approach and that's why I want to share
>> with you a major proposal revision which you can find here:
>> https://s.apache.org/test-metrics-storage-corrected.
>>
>> As usual, we are open to your suggestions and thoughts.
>>
>> Thanks,
>> Kamil
>>
>> On Thu, Sep 5, 2019 at 5:11 PM Kamil Wasilewski <
>> kamil.wasilew...@polidea.com> wrote:
>>
>>> We've just started working on this and here comes the first PR:
>>> https://github.com/apache/beam/pull/9482.
>>>
>>> Feel free to share your thoughts and ideas.
>>>
>>> Kamil
>>>
>>> On Tue, Aug 27, 2019 at 1:19 AM Pablo Estrada 
>>> wrote:
>>>
 Thanks Kamil for bringing this!
 +Manisha Bhardwaj  +Mark Liu  have
 worked on internal benchmarking at Google - would you take a look please?

 On Fri, Aug 23, 2019 at 3:22 AM Kamil Wasilewski <
 kamil.wasilew...@polidea.com> wrote:

> Hi all,
>
> Recently we did some research on how to visualize IO performance
> tests, Nexmark and Load test results better and how to detect regressions
> automatically in an easy way using tools dedicated for the job.
>
> We'd like to share a proposal with you:
> 
> https://s.apache.org/test-metrics-storage
>
>
>
> Any comments are highly appreciated.
>
> Thanks,
>
> Kamil
>



Re: [DISCUSS] Avoid redundant encoding and decoding between runner and harness

2019-11-07 Thread Luke Cwik
I did suggest one other alternative on Jincheng's PR[1] which was to allow
windowless values to be sent across the gRPC port. The SDK would then be
responsible for ensuring that the execution didn't access any properties
that required knowledge of the timestamp, pane or window. This is different
then adding the ValueOnlyWindowedValueCoder as a model coder because it
allows SDKs to pass around raw values to functions without any windowing
overhead which could be useful for things like the side input window
mapping or window merging functions we have.

1: https://github.com/apache/beam/pull/9979

On Thu, Nov 7, 2019 at 8:48 AM Robert Bradshaw  wrote:

> I think there is some misunderstanding about what is meant by option
> 2. What Kenn (I think) and I are proposing is not a WindowedValueCoder
> whose window/timestamp/paneinfo coders are parameterized to be
> constant coders, but a WindowedValueCoder whose
> window/timestamp/paneinfo values are specified as constants in the
> coder.
>
> Let's call this NewValueOnlyWindowedValueCoder, and is parameterized
> by a window, timestamp, and pane info instance
>
> The existing ValueOnlyWindowedValueCoder is literally
> NewValueOnlyWindowedValueCoder(GlobalWindow, MIN_TIMESTAMP,
> PaneInfo.NO_FIRING). Note in particular that using the existing
> ValueOnlyWindowedValueCoder would give the wrong timestamp and pane
> info if it is use for the result of a GBK, which I think is the loss
> of consistency referred to here.
>
> On Thu, Nov 7, 2019 at 1:03 AM jincheng sun 
> wrote:
> >
> > Thanks for your feedback and the valuable comments, Kenn & Robert!
> >
> > I think your comments are more comprehensive and enlighten me a lot. The
> two proposals which I mentioned above are to reuse the existing coder
> (FullWindowedValueCoder and ValueOnlyWindowedValueCoder). Now, with your
> comments, I think we can further abstract 'FullWindowedValueCoder' and
> 'ValueOnlyWindowedValueCoder', that is, we can rename
> 'FullWindowedValueCoder' to 'WindowedValueCoder', and make the coders of
> window/timestamp/pane configurable. Then we can remove
> 'ValueOnlyWindowedValueCoder' and need to add a mount of constant coders
> for window/timestamp/pane.
> >
> > I have replied your comments on the doc, and quick feedback as following:
> >
> > Regarding to "Approach 2: probably no SDK harness work / compatible with
> existing Beam model so no risk of introducing inconsistency",if we "just
> puts default window/timestamp/pane info on elements" and don't change the
> original coder, then the performance is not optimized. If we want to get
> the best performance, then the default coder of Window/timestamp/pane
> should be constant coder. In this case the SDK harnesses need to be aware
> of the constant coder and there will be some development work in the SDK
> harness. Besides, the SDK harness also needs to make the coders for
> window/timestamp/pane configurable and this will introduce some related
> changes, such as updating WindowedValueCoder._get_component_coders, etc.
> >
> > Regarding to "Approach 1: option a: if the SDK harness has to understand
> 'values without windows' then very large changes and high risk of
> introducing inconsistency (I eliminated many of these inconsistencies)", we
> only need to add ValueOnlyWindowedValueCoder to the StandardCoders and all
> the SDK harness should be aware of this coder. There is no much changes
> actually.
> >
> > Please feel free to correct me if there is anyting incorrect. :)
> >
> > Besides, I'm not quite clear about the consistency issues you meant
> here. Could you please give me some hints about this?
> >
> > Best,
> > Jincheng
> >
> > Robert Bradshaw  于2019年11月7日周四 上午3:38写道:
> >>
> >> Yes, the portability framework is designed to support this, and
> >> possibly even more efficient transfers of data than element-by-element
> >> as per the wire coder specified in the IO port operators. I left some
> >> comments on the doc as well, and would also prefer approach 2.
> >>
> >> On Wed, Nov 6, 2019 at 11:03 AM Kenneth Knowles 
> wrote:
> >> >
> >> > I think the portability framework is designed for this. The runner
> controls the coder on the grpc ports and the runner controls the process
> bundle descriptor.
> >> >
> >> > I commented on the doc. I think what is missing is analysis of scope
> of SDK harness changes and risk to model consistency
> >> >
> >> > Approach 2: probably no SDK harness work / compatible with
> existing Beam model so no risk of introducing inconsistency
> >> >
> >> > Approach 1: what are all the details?
> >> > option a: if the SDK harness has to understand "values
> without windows" then very large changes and high risk of introducing
> inconsistency (I eliminated many of these inconsistencies)
> >> > option b: if the coder just puts default
> window/timestamp/pane info on elements, then it is the same as approach 2,
> no work / no risk
> >> >
> >> > Kenn
> >> >
> >> > On Wed, Nov 6, 2019 at 1:09 AM 

Re: [DISCUSS] Avoid redundant encoding and decoding between runner and harness

2019-11-07 Thread Robert Bradshaw
I think there is some misunderstanding about what is meant by option
2. What Kenn (I think) and I are proposing is not a WindowedValueCoder
whose window/timestamp/paneinfo coders are parameterized to be
constant coders, but a WindowedValueCoder whose
window/timestamp/paneinfo values are specified as constants in the
coder.

Let's call this NewValueOnlyWindowedValueCoder, and is parameterized
by a window, timestamp, and pane info instance

The existing ValueOnlyWindowedValueCoder is literally
NewValueOnlyWindowedValueCoder(GlobalWindow, MIN_TIMESTAMP,
PaneInfo.NO_FIRING). Note in particular that using the existing
ValueOnlyWindowedValueCoder would give the wrong timestamp and pane
info if it is use for the result of a GBK, which I think is the loss
of consistency referred to here.

On Thu, Nov 7, 2019 at 1:03 AM jincheng sun  wrote:
>
> Thanks for your feedback and the valuable comments, Kenn & Robert!
>
> I think your comments are more comprehensive and enlighten me a lot. The two 
> proposals which I mentioned above are to reuse the existing coder 
> (FullWindowedValueCoder and ValueOnlyWindowedValueCoder). Now, with your 
> comments, I think we can further abstract 'FullWindowedValueCoder' and 
> 'ValueOnlyWindowedValueCoder', that is, we can rename 
> 'FullWindowedValueCoder' to 'WindowedValueCoder', and make the coders of 
> window/timestamp/pane configurable. Then we can remove 
> 'ValueOnlyWindowedValueCoder' and need to add a mount of constant coders for 
> window/timestamp/pane.
>
> I have replied your comments on the doc, and quick feedback as following:
>
> Regarding to "Approach 2: probably no SDK harness work / compatible with 
> existing Beam model so no risk of introducing inconsistency",if we "just puts 
> default window/timestamp/pane info on elements" and don't change the original 
> coder, then the performance is not optimized. If we want to get the best 
> performance, then the default coder of Window/timestamp/pane should be 
> constant coder. In this case the SDK harnesses need to be aware of the 
> constant coder and there will be some development work in the SDK harness. 
> Besides, the SDK harness also needs to make the coders for 
> window/timestamp/pane configurable and this will introduce some related 
> changes, such as updating WindowedValueCoder._get_component_coders, etc.
>
> Regarding to "Approach 1: option a: if the SDK harness has to understand 
> 'values without windows' then very large changes and high risk of introducing 
> inconsistency (I eliminated many of these inconsistencies)", we only need to 
> add ValueOnlyWindowedValueCoder to the StandardCoders and all the SDK harness 
> should be aware of this coder. There is no much changes actually.
>
> Please feel free to correct me if there is anyting incorrect. :)
>
> Besides, I'm not quite clear about the consistency issues you meant here. 
> Could you please give me some hints about this?
>
> Best,
> Jincheng
>
> Robert Bradshaw  于2019年11月7日周四 上午3:38写道:
>>
>> Yes, the portability framework is designed to support this, and
>> possibly even more efficient transfers of data than element-by-element
>> as per the wire coder specified in the IO port operators. I left some
>> comments on the doc as well, and would also prefer approach 2.
>>
>> On Wed, Nov 6, 2019 at 11:03 AM Kenneth Knowles  wrote:
>> >
>> > I think the portability framework is designed for this. The runner 
>> > controls the coder on the grpc ports and the runner controls the process 
>> > bundle descriptor.
>> >
>> > I commented on the doc. I think what is missing is analysis of scope of 
>> > SDK harness changes and risk to model consistency
>> >
>> > Approach 2: probably no SDK harness work / compatible with existing 
>> > Beam model so no risk of introducing inconsistency
>> >
>> > Approach 1: what are all the details?
>> > option a: if the SDK harness has to understand "values without 
>> > windows" then very large changes and high risk of introducing 
>> > inconsistency (I eliminated many of these inconsistencies)
>> > option b: if the coder just puts default window/timestamp/pane 
>> > info on elements, then it is the same as approach 2, no work / no risk
>> >
>> > Kenn
>> >
>> > On Wed, Nov 6, 2019 at 1:09 AM jincheng sun  
>> > wrote:
>> >>
>> >> Hi all,
>> >>
>> >> I am trying to make some improvements of portability framework to make it 
>> >> usable in other projects. However, we find that the coder between runner 
>> >> and harness can only be FullWindowedValueCoder. This means each time when 
>> >> sending a WindowedValue, we have to encode/decode timestamp, windows and 
>> >> pan infos. In some circumstances(such as using the portability framework 
>> >> in Flink), only values are needed between runner and harness. So, it 
>> >> would be nice if we can configure the coder and avoid redundant encoding 
>> >> and decoding between runner and harness to improve the performance.
>> >>
>> >> There are two approaches to 

Re: [PROPOSAL] Storing, displaying and detecting anomalies in test results

2019-11-07 Thread Dan Gazineu
Thank you for the update Kamil! Please fix the sharing options in the new
doc.

On Thu, Nov 7, 2019 at 7:22 AM Kamil Wasilewski <
kamil.wasilew...@polidea.com> wrote:

> Hi all,
>
> For a while we have been working on the implementation of proposal
> presented in this thread. Just to remind what it was about in short words —
> we wanted to use Prometheus and Grafana to visualize Beam performance tests
> results in addition to detect regressions automatically. A couple of
> obstacles forced us to change our approach and that's why I want to share
> with you a major proposal revision which you can find here:
> https://s.apache.org/test-metrics-storage-corrected.
>
> As usual, we are open to your suggestions and thoughts.
>
> Thanks,
> Kamil
>
> On Thu, Sep 5, 2019 at 5:11 PM Kamil Wasilewski <
> kamil.wasilew...@polidea.com> wrote:
>
>> We've just started working on this and here comes the first PR:
>> https://github.com/apache/beam/pull/9482.
>>
>> Feel free to share your thoughts and ideas.
>>
>> Kamil
>>
>> On Tue, Aug 27, 2019 at 1:19 AM Pablo Estrada  wrote:
>>
>>> Thanks Kamil for bringing this!
>>> +Manisha Bhardwaj  +Mark Liu  have
>>> worked on internal benchmarking at Google - would you take a look please?
>>>
>>> On Fri, Aug 23, 2019 at 3:22 AM Kamil Wasilewski <
>>> kamil.wasilew...@polidea.com> wrote:
>>>
 Hi all,

 Recently we did some research on how to visualize IO performance tests,
 Nexmark and Load test results better and how to detect regressions
 automatically in an easy way using tools dedicated for the job.

 We'd like to share a proposal with you:
 
 https://s.apache.org/test-metrics-storage



 Any comments are highly appreciated.

 Thanks,

 Kamil

>>>


Re: [spark structured streaming runner] merge to master?

2019-11-07 Thread Etienne Chauchot

Hi guys

@Kenn,

I just wanted to mention that I did answered your question on 
dependencies here: 
https://lists.apache.org/thread.html/5a85caac41e796c2aa351d835b3483808ebbbd4512b480940d494439@%3Cdev.beam.apache.org%3E


regarding jars:

I don't like 3 jars either.

I'm not in favor of having the 2 runners in one jar, the point about 
having 2 jars was to:


- avoid making promises to users on a work in progress runner (make it 
explicit with a different jar)


- avoid confusion for them (why are there 2 pipeline options? etc)

If the community believes that there is no confusion or wrong promises 
with the one jar solution, we could leave the 2 runners in one jar.


Maybe we could start a vote on that?

Etienne

On 31/10/2019 02:06, Kenneth Knowles wrote:
Very good points. We definitely ship a lot of code/features in very 
early stages, and there seems to be no problem.


I intend mostly to leave this judgment to people like you who know 
better about Spark users.


But I do think 1 or 2 jars is better than 3. I really don't like "3 
jars" and I did give two reasons:


1. diamond deps where things overlap
2. figuring out which thing to depend on

Both are annoying for users. I am not certain if it could lead to a 
real unsolvable situation. This is just a Java ecosystem problem so I 
feel qualified to comment.


I did also ask if there were major dependency differences between the 
two that could cause problems for users. This question was dropped and 
no one cares to comment so I assume it is not an issue. So then I 
favor having just 1 jar with both runners.


Kenn

On Wed, Oct 30, 2019 at 2:46 PM Ismaël Mejía > wrote:


I am still a bit lost about why we are discussing options without
giving any
arguments or reasons for the options? Why is 2 modules better than
3 or 3 better
than 2, or even better, what forces us to have something different
than a single
module?

What are the reasons for wanting to have separate jars? If the
issue is that the
code is unfinished or not passing the tests, the impact for end
users is minimal
because they cannot accidentally end up running the new runner,
and if they
decide to do so we can warn them it is at their own risk and not
ready for
production in the documentation + runner.

If the fear is that new code may end up being intertwined with the
classic and
portable runners and have some side effects. We have the
ValidatesRunner +
Nexmark in the CI to cover this so again I do not see what is the
problem that
requires modules to be separate.

If the issue is being uncomfortable about having in-progress code
in released
artifacts we have been doing this in Beam forever, for example
most of the work
on portability and Schema/SQL, and all of those were still part of
artifacts
long time before they were ready for prime use, so I still don't
see why this
case is different to require different artifacts.

I have the impression we are trying to solve a non-issue by adding
a lot of
artificial complexity (in particular to the users), or am I
missing something
else?

On Wed, Oct 30, 2019 at 7:40 PM Kenneth Knowles mailto:k...@apache.org>> wrote:
>
> Oh, I mean that we ship just 2 jars.
>
> And since Spark users always build an uber jar, they can still
depend on both of ours and be able to switch runners with a flag.
>
> I really dislike projects shipping overlapping jars. It is
confusing and causes major diamond dependency problems.
>
> Kenn
>
> On Wed, Oct 30, 2019 at 11:12 AM Alexey Romanenko
mailto:aromanenko@gmail.com>> wrote:
>>
>> Yes, agree, two jars included in uber jar will work in the
similar way. Though having 3 jars looks still quite confusing for me.
>>
>> On 29 Oct 2019, at 23:54, Kenneth Knowles mailto:k...@apache.org>> wrote:
>>
>> Is it just as easy to have two jars and build an uber jar with
both included? Then the runner can still be toggled with a flag.
>>
>> Kenn
>>
>> On Tue, Oct 29, 2019 at 9:38 AM Alexey Romanenko
mailto:aromanenko@gmail.com>> wrote:
>>>
>>> Hmm, I don’t think that jar size should play a big role
comparing to the whole size of shaded jar of users job. Even more,
I think it will be quite confusing for users to choose which jar
to use if we will have 3 different ones for similar purposes.
Though, let’s see what others think.
>>>
>>> On 29 Oct 2019, at 15:32, Etienne Chauchot
mailto:echauc...@apache.org>> wrote:
>>>
>>> Hi Alexey,
>>>
>>> Thanks for your opinion !
>>>
>>> Comments inline
>>>
>>> Etienne
>>>
>>> On 28/10/2019 17:34, Alexey Romanenko wrote:
>>>
>>> Let me share some of my thoughts on this.
>>>
>>>     - shall we filter out the package name from 

Re: Key encodings for state requests

2019-11-07 Thread Robert Bradshaw
On Thu, Nov 7, 2019 at 6:26 AM Maximilian Michels  wrote:
>
> Thanks for the feedback thus far. Some more comments:
>
> > Instead, the runner knows ahead of time that it
> > will need to instantiate this coder, and should update the bundle
> > processor to specify KvCoder,
> > VarIntCoder> as the coder so both can pull it out in a consistent way.
>
> By "update the bundle processor", do you mean modifying the
> ProcessBundleDescriptor's BagUserState with the correct key coder?
> Conceptually that is possible, but the current implementation does not
> allow for this to happen:
> https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284
> It enforces ByteString which does not tell the SDK Harness anything
> about the desired encoding.

I meant update the BundleProcessDescriptor proto that is sent to the
SDK 
https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L140,
essentially option (1).

> Since the above does not seem feasible, I see the following options:
>
> (1) Modify the pipeline Proto before the translation and wrap a
> LengthPrefixCoder around non-standard key coders for stateful
> transforms. This would change the encoding for the entire element, to be
> sure that the key coder for state requests contains a LengthPrefixCoder
> for state requests from the SDK Harness. Not optimal.

Yes. The contract should be that both the runner and SDK use the
coders that are specified in the proto. The runner controls the proto,
and should ensure it only sends protos it will be able to handle the
SDK responding to. I'm not seeing why this is sub-optimal.

> (2) Add a new method WireCoders#instantiateRunnerWireKeyCoder which
> returns the correct key coder, i.e. for standard coders, the concrete
> coder, and for non-standard coders a ByteArrayCoder. We also need to
> ensure the key encoding on the Runner side is OUTER context, to avoid
> adding a length prefix to the encoded bytes. Basically, the non-standard
> coders result in a NOOP coder which does not touch the key bytes.

I'd really like to avoid implicit agreements about how the coder that
should be used differs from what's specified in the proto in different
contexts.

> (3) Patch the Python SDK to ensure non-standard state key coders are
> always wrapped in a LengthPrefixCoder. That way, we can keep the
> existing logic on the Runner side.

The key concept here is not "standard coder" but "coder that the
runner does not understand." This knowledge is only in the runner.
Also has the downside of (2).

> Option (2) seems like the most practical.
>
> -Max
>
> On 06.11.19 17:26, Robert Bradshaw wrote:
> > On Wed, Nov 6, 2019 at 2:55 AM Maximilian Michels  wrote:
> >>
> >> Let me try to clarify:
> >>
> >>> The Coder used for State/Timers in a StatefulDoFn is pulled out of the
> >>> input PCollection. If a Runner needs to partition by this coder, it
> >>> should ensure the coder of this PCollection matches with the Coder
> >>> used to create the serialized bytes that are used for partitioning
> >>> (whether or not this is length-prefixed).
> >>
> >> That is essentially what I had assumed when I wrote the code. The
> >> problem is the coder can be "pulled out" in different ways.
> >>
> >> For example, let's say we have the following Proto PCollection coder
> >> with non-standard coder "CustomCoder" as the key coder:
> >>
> >> KvCoder
> >>
> >>   From the Runner side, this currently looks like the following:
> >>
> >> PCol: KvCoder, VarIntCoder>
> >> Key:  LengthPrefixCoder
> >
> > This is I think where the error is. When If the proto references
> > KvCoder it should not be pulled out as
> > KvCoder, VarIntCoder>; as that
> > doesn't have the same encoding. Trying to do instantiate such a coder
> > should be an error. Instead, the runner knows ahead of time that it
> > will need to instantiate this coder, and should update the bundle
> > processor to specify KvCoder,
> > VarIntCoder> as the coder so both can pull it out in a consistent way.
> >
> > When the coder is KvCoder, VarIntCoder>
> > instantiating it as KvCoder on the runner
> > is of course OK as they do have the same encoding.
> >
> >> At the SDK Harness, we have the coder available:
> >>
> >> PCol: KvCoder
> >> Key:  CustomCoder
> >>
> >> Currently, when the SDK Harness serializes a key for a state request,
> >> the custom coder may happen to add a length prefix, or it may not. It
> >> depends on the coder used. The correct behavior would be to use the same
> >> representation as on the Runner side.
> >>
> >>> Specifically, "We have no way of telling from the Runner side, if a 
> >>> length prefix has been used or not." seems false
> >>
> >> The Runner cannot inspect an unknown coder, it only has the opaque Proto
> >> information available which does not allow introspection of non-standard
> >> coders. 

Re: Key encodings for state requests

2019-11-07 Thread Robert Burke
While the Go SDK doesn't yet support a State API, Option 3) is what the Go
SDK does for all non-standard coders (aka custom coders) anyway.

While this means that for certain custom encodings of user types there may
be the overhead of length prefixing it, it's not likely to be the most
significant cost in encoding values.

On Thu, Nov 7, 2019, 6:26 AM Maximilian Michels  wrote:

> Thanks for the feedback thus far. Some more comments:
>
> > Instead, the runner knows ahead of time that it
> > will need to instantiate this coder, and should update the bundle
> > processor to specify KvCoder,
> > VarIntCoder> as the coder so both can pull it out in a consistent way.
>
> By "update the bundle processor", do you mean modifying the
> ProcessBundleDescriptor's BagUserState with the correct key coder?
> Conceptually that is possible, but the current implementation does not
> allow for this to happen:
>
> https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284
> It enforces ByteString which does not tell the SDK Harness anything
> about the desired encoding.
>
> Since the above does not seem feasible, I see the following options:
>
> (1) Modify the pipeline Proto before the translation and wrap a
> LengthPrefixCoder around non-standard key coders for stateful
> transforms. This would change the encoding for the entire element, to be
> sure that the key coder for state requests contains a LengthPrefixCoder
> for state requests from the SDK Harness. Not optimal.
>
> (2) Add a new method WireCoders#instantiateRunnerWireKeyCoder which
> returns the correct key coder, i.e. for standard coders, the concrete
> coder, and for non-standard coders a ByteArrayCoder. We also need to
> ensure the key encoding on the Runner side is OUTER context, to avoid
> adding a length prefix to the encoded bytes. Basically, the non-standard
> coders result in a NOOP coder which does not touch the key bytes.
>
> (3) Patch the Python SDK to ensure non-standard state key coders are
> always wrapped in a LengthPrefixCoder. That way, we can keep the
> existing logic on the Runner side.
>
>
> Option (2) seems like the most practical.
>
> -Max
>
> On 06.11.19 17:26, Robert Bradshaw wrote:
> > On Wed, Nov 6, 2019 at 2:55 AM Maximilian Michels 
> wrote:
> >>
> >> Let me try to clarify:
> >>
> >>> The Coder used for State/Timers in a StatefulDoFn is pulled out of the
> >>> input PCollection. If a Runner needs to partition by this coder, it
> >>> should ensure the coder of this PCollection matches with the Coder
> >>> used to create the serialized bytes that are used for partitioning
> >>> (whether or not this is length-prefixed).
> >>
> >> That is essentially what I had assumed when I wrote the code. The
> >> problem is the coder can be "pulled out" in different ways.
> >>
> >> For example, let's say we have the following Proto PCollection coder
> >> with non-standard coder "CustomCoder" as the key coder:
> >>
> >> KvCoder
> >>
> >>   From the Runner side, this currently looks like the following:
> >>
> >> PCol: KvCoder, VarIntCoder>
> >> Key:  LengthPrefixCoder
> >
> > This is I think where the error is. When If the proto references
> > KvCoder it should not be pulled out as
> > KvCoder, VarIntCoder>; as that
> > doesn't have the same encoding. Trying to do instantiate such a coder
> > should be an error. Instead, the runner knows ahead of time that it
> > will need to instantiate this coder, and should update the bundle
> > processor to specify KvCoder,
> > VarIntCoder> as the coder so both can pull it out in a consistent way.
> >
> > When the coder is KvCoder, VarIntCoder>
> > instantiating it as KvCoder on the runner
> > is of course OK as they do have the same encoding.
> >
> >> At the SDK Harness, we have the coder available:
> >>
> >> PCol: KvCoder
> >> Key:  CustomCoder
> >>
> >> Currently, when the SDK Harness serializes a key for a state request,
> >> the custom coder may happen to add a length prefix, or it may not. It
> >> depends on the coder used. The correct behavior would be to use the same
> >> representation as on the Runner side.
> >>
> >>> Specifically, "We have no way of telling from the Runner side, if a
> length prefix has been used or not." seems false
> >>
> >> The Runner cannot inspect an unknown coder, it only has the opaque Proto
> >> information available which does not allow introspection of non-standard
> >> coders. With the current state, the Runner may think the coder adds a
> >> length prefix but the Python SDK worker could choose to add none. This
> >> produces an inconsistent key encoding. See above.
> >
> > I think what's being conflated here is "the Coder has been wrapped in
> > a LengthPrefixCoder" vs. "the coder does length prefixing." These are
> > two orthogonal concepts. The runner in general only knows the former.
> >
> >> It looks like the key 

Re: [PROPOSAL] Storing, displaying and detecting anomalies in test results

2019-11-07 Thread Kamil Wasilewski
Hi all,

For a while we have been working on the implementation of proposal
presented in this thread. Just to remind what it was about in short words —
we wanted to use Prometheus and Grafana to visualize Beam performance tests
results in addition to detect regressions automatically. A couple of
obstacles forced us to change our approach and that's why I want to share
with you a major proposal revision which you can find here:
https://s.apache.org/test-metrics-storage-corrected.

As usual, we are open to your suggestions and thoughts.

Thanks,
Kamil

On Thu, Sep 5, 2019 at 5:11 PM Kamil Wasilewski <
kamil.wasilew...@polidea.com> wrote:

> We've just started working on this and here comes the first PR:
> https://github.com/apache/beam/pull/9482.
>
> Feel free to share your thoughts and ideas.
>
> Kamil
>
> On Tue, Aug 27, 2019 at 1:19 AM Pablo Estrada  wrote:
>
>> Thanks Kamil for bringing this!
>> +Manisha Bhardwaj  +Mark Liu  have
>> worked on internal benchmarking at Google - would you take a look please?
>>
>> On Fri, Aug 23, 2019 at 3:22 AM Kamil Wasilewski <
>> kamil.wasilew...@polidea.com> wrote:
>>
>>> Hi all,
>>>
>>> Recently we did some research on how to visualize IO performance tests,
>>> Nexmark and Load test results better and how to detect regressions
>>> automatically in an easy way using tools dedicated for the job.
>>>
>>> We'd like to share a proposal with you:
>>> 
>>> https://s.apache.org/test-metrics-storage
>>>
>>>
>>>
>>> Any comments are highly appreciated.
>>>
>>> Thanks,
>>>
>>> Kamil
>>>
>>


Re: [Discuss] Beam Summit 2020 Dates & locations

2019-11-07 Thread Elliotte Rusty Harold
The U.S. sadly is not a reliable destination for international
conferences these days. Almost every conference I go to, big and
small, has at least one speaker, sometimes more, who can't get into
the country. Canada seems worth considering. Vancouver, Montreal, and
Toronto are all convenient.

On Wed, Nov 6, 2019 at 2:17 PM Griselda Cuevas  wrote:
>
> Hi Beam Community!
>
> I'd like to kick off a thread to discuss potential dates and venues for the 
> 2020 Beam Summits.
>
> I did some research on industry conferences happening in 2020 and 
> pre-selected a few ranges as follows:
>
> (2 days) NA between mid-May and mid-June
> (2 days) EU mid October
> (1 day) Asia Mini Summit:  March
>
> I'd like to hear your thoughts on these dates and get consensus on exact 
> dates as the convo progresses.
>
> For locations these are the options I reviewed:
>
> NA: Austin Texas, Berkeley California, Mexico City.
> Europe: Warsaw, Barcelona, Paris
> Asia: Singapore
>
> Let the discussion begin!
> G (on behalf of the Beam Summit Steering Committee)
>
>
>


-- 
Elliotte Rusty Harold
elh...@ibiblio.org


Re: [VOTE] @RequiresTimeSortedInput stateful DoFn annotation

2019-11-07 Thread Jan Lukavský
Hi,is there anything I can do to make this more attractive? :-) Any feedback would be much appreciated.Many thanks, JanDne 5. 11. 2019 14:10 napsal uživatel Jan Lukavský :Hi,

I'd like to open a vote on accepting design document [1] as a base for 
implementation of @RequiresTimeSortedInput annotation for stateful 
DoFns. Associated JIRA [2] and PR [3] contains only subset of the whole 
functionality (allowed lateness ignored and no possibility to specify 
UDF for time - or sequential number - to be extracted from data). The PR 
will be subject to independent review process (please feel free to 
self-request review if you are interested in this) after the vote would 
eventually succeed. Missing features from the design document will be 
added later in subsequent JIRA issues, so that it doesn't block 
availability of this feature.

Please vote on adding support for @RequiresTimeSortedInput.

The vote is open for the next 72 hours and passes if at least three +1 
and no -1 PMC (binding) votes are cast.

[ ] +1 Add support for @RequiresTimeSortedInput

[ ] 0 I don't have a strong opinion about this, but I assume it's ok

[ ] -1 Do not support @RequiresTimeSortedInput - please provide explanation.

Thanks,

  Jan

[1] 
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing

[2] https://issues.apache.org/jira/browse/BEAM-8550

[3] https://github.com/apache/beam/pull/8774




Permission to contribute on LZO compression enablement for Beam Java SDK

2019-11-07 Thread Amogh Tiwari
Hi,

I would like to contribute on enabling Apache Beam's java SDK to work with
LZO compression. Please add me as a contributor so that I can work on this.
I've also raised a ticket 
for the same.

Thanks and best regards,
Amogh Tiwari


Re: (Question) SQL integration tests for MongoDb

2019-11-07 Thread Michał Walenia
Hi,

What exactly are you trying to do? If you're looking for a way to provide
pipeline options to the MongoDBIOIT, you can pass them via command line
like this:

./gradlew integrationTest -p sdks/java/io/mongodb



* -DintegrationTestPipelineOptions='[   "--mongoDBHostName=1.2.3.4",
 "--mongoDBPort=27017",   "--mongoDBDatabaseName=mypass",
 "--numberOfRecords=1000" ]'*
   --tests org.apache.beam.sdk.io.mongodb.MongoDbIOIT
   -DintegrationTestRunner=direct

Gradle tasks created with `enableJavaPerformanceTesting()` will allow such
options to be passed.

If you're trying to do something else, please let me know.

Regards
Michal

On Thu, Nov 7, 2019 at 1:44 AM Kirill Kozlov 
wrote:

> Hi everyone!
>
> I am trying to test MongoDb Sql Table, but not quite sure how to pass
> pipeline options with the hostName, port, and databaseName used by Jenkins.
>
> It looks like the integration test for MongoDbIO Connector obtain those
> values from the
> 'beam/.test-infra/jenkins/job_PerformanceTests_MongoDBIO_IT.groovy' file
> via calling the following methods in the 'gradle.build' file:
> provideIntegrationTestingDependencies()
> enableJavaPerformanceTesting()
>
> Sql build file already has a task with the name 'integrationTest' defined
> and does not let us do `enableJavaPerformanceTesting()`.
>
>  I would really appreciate if someone could provide me with a couple of
> pointers on getting this to work.
>
> -
> Kirill
>


-- 

Michał Walenia
Polidea  | Software Engineer

M: +48 791 432 002 <+48791432002>
E: michal.wale...@polidea.com

Unique Tech
Check out our projects! 


Re: [DISCUSS] Avoid redundant encoding and decoding between runner and harness

2019-11-07 Thread jincheng sun
Thanks for your feedback and the valuable comments, Kenn & Robert!

I think your comments are more comprehensive and enlighten me a lot. The
two proposals which I mentioned above are to reuse the existing coder
(FullWindowedValueCoder and ValueOnlyWindowedValueCoder). Now, with your
comments, I think we can further abstract 'FullWindowedValueCoder' and
'ValueOnlyWindowedValueCoder', that is, we can rename
'FullWindowedValueCoder' to 'WindowedValueCoder', and make the coders of
window/timestamp/pane configurable. Then we can remove
'ValueOnlyWindowedValueCoder' and need to add a mount of constant coders
for window/timestamp/pane.

I have replied your comments on the doc, and quick feedback as following:

Regarding to "Approach 2: probably no SDK harness work / compatible with
existing Beam model so no risk of introducing inconsistency",if we "just
puts default window/timestamp/pane info on elements" and don't change the
original coder, then the performance is not optimized. If we want to get
the best performance, then the default coder of Window/timestamp/pane
should be constant coder. In this case the SDK harnesses need to be aware
of the constant coder and there will be some development work in the SDK
harness. Besides, the SDK harness also needs to make the coders for
window/timestamp/pane configurable and this will introduce some related
changes, such as updating WindowedValueCoder._get_component_coders, etc.

Regarding to "Approach 1: option a: if the SDK harness has to understand
'values without windows' then very large changes and high risk of
introducing inconsistency (I eliminated many of these inconsistencies)", we
only need to add ValueOnlyWindowedValueCoder to the StandardCoders and all
the SDK harness should be aware of this coder. There is no much changes
actually.

Please feel free to correct me if there is anyting incorrect. :)

Besides, I'm not quite clear about the consistency issues you meant here.
Could you please give me some hints about this?

Best,
Jincheng

Robert Bradshaw  于2019年11月7日周四 上午3:38写道:

> Yes, the portability framework is designed to support this, and
> possibly even more efficient transfers of data than element-by-element
> as per the wire coder specified in the IO port operators. I left some
> comments on the doc as well, and would also prefer approach 2.
>
> On Wed, Nov 6, 2019 at 11:03 AM Kenneth Knowles  wrote:
> >
> > I think the portability framework is designed for this. The runner
> controls the coder on the grpc ports and the runner controls the process
> bundle descriptor.
> >
> > I commented on the doc. I think what is missing is analysis of scope of
> SDK harness changes and risk to model consistency
> >
> > Approach 2: probably no SDK harness work / compatible with existing
> Beam model so no risk of introducing inconsistency
> >
> > Approach 1: what are all the details?
> > option a: if the SDK harness has to understand "values without
> windows" then very large changes and high risk of introducing inconsistency
> (I eliminated many of these inconsistencies)
> > option b: if the coder just puts default window/timestamp/pane
> info on elements, then it is the same as approach 2, no work / no risk
> >
> > Kenn
> >
> > On Wed, Nov 6, 2019 at 1:09 AM jincheng sun 
> wrote:
> >>
> >> Hi all,
> >>
> >> I am trying to make some improvements of portability framework to make
> it usable in other projects. However, we find that the coder between runner
> and harness can only be FullWindowedValueCoder. This means each time when
> sending a WindowedValue, we have to encode/decode timestamp, windows and
> pan infos. In some circumstances(such as using the portability framework in
> Flink), only values are needed between runner and harness. So, it would be
> nice if we can configure the coder and avoid redundant encoding and
> decoding between runner and harness to improve the performance.
> >>
> >> There are two approaches to solve this issue:
> >>
> >> Approach 1:  Support ValueOnlyWindowedValueCoder between runner and
> harness.
> >> Approach 2:  Add a "constant" window coder that embeds all the
> windowing information as part of the coder that should be used to wrap the
> value during decoding.
> >>
> >> More details can be found here [1].
> >>
> >> As of the shortcomings of “Approach 2” which still need to
> encode/decode timestamp and pane infos, we tend to choose “Approach 1”
> which brings better performance and is more thorough.
> >>
> >> Welcome any feedback :)
> >>
> >> Best,
> >> Jincheng
> >>
> >> [1]
> https://docs.google.com/document/d/1TTKZC6ppVozG5zV5RiRKXse6qnJl-EsHGb_LkUfoLxY/edit?usp=sharing
> >>
>