Re: Key encodings for state requests

2019-11-08 Thread Robert Burke
The SDKs need to know each of the coders defined in the proto. Go and
Python can't use the Java coders. Making a standard definition for the
coder, adding it to the proto enum, and implementing that coder in each SDK
is what makes the coders standard.

In other words, the Java model coders are the implementation of the
standard coders as defined by the proto file.


On Fri, Nov 8, 2019, 10:04 PM jincheng sun  wrote:

>
> > Let us first define what are "standard coders". Usually it should be the
> coders defined in the Proto. However, personally I think the coders defined
> in the Java ModelCoders [1] seems more appropriate. The reason is that for
> a coder which has already appeared in Proto and still not added to the Java
> ModelCoders, it's always replaced by the runner with
> LengthPrefixCoder[ByteArrayCoder] and so the SDK harness will decode the
> data with LengthPrefixCoder[ByteArrayCoder] instead of the actual coder.
> That's to say, that coder does not still take effect in the SDK harness.
> Only when the coder is added in ModelCoders, it's 'known' and will take
> effect.
>
> Correct this point! The coder which is not contained in the Java
> ModelCoders is replaced with LengthPrefixCoder[ByteArrayCoder] at runner
> side and LengthPrefixCoder[CustomCoder] at SDK harness side.
>
> The point here is that the runner determines whether it knows the coder
> according to the coders defined in the Java ModelCoders, not the coders
> defined in the proto file. So if taking option 3, the non-standard coders
> which will be wrapped with LengthPrefixCoder should also be determined by
> the coders defined in the Java ModerCoders, not the coders defined in the
> proto file.
>
> jincheng sun 于2019年11月9日 周六12:26写道:
>
>> Hi Robert Bradshaw,
>>
>> Thanks a lot for the explanation. Very interesting topic!
>>
>> Let us first define what are "standard coders". Usually it should be the
>> coders defined in the Proto. However, personally I think the coders defined
>> in the Java ModelCoders [1] seems more appropriate. The reason is that for
>> a coder which has already appeared in Proto and still not added to the Java
>> ModelCoders, it's always replaced by the runner with
>> LengthPrefixCoder[ByteArrayCoder] and so the SDK harness will decode the
>> data with LengthPrefixCoder[ByteArrayCoder] instead of the actual coder.
>> That's to say, that coder does not still take effect in the SDK harness.
>> Only when the coder is added in ModelCoders, it's 'known' and will take
>> effect.
>>
>> So if we take option 3, the non-standard coders which will be wrapped
>> with LengthPrefixCoder should be synced with the coders defined in the Java
>> ModerCoders. (From this point of view, option 1 seems more clean!)
>>
>> Please correct me if I missed something. Thanks a lot!
>>
>> Best,
>> Jincheng
>>
>> [1]
>> https://github.com/apache/beam/blob/01726e9c62313749f9ea7c93063a1178abd1a8db/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java#L59
>>
>> Robert Burke  于2019年11月9日周六 上午8:46写道:
>>
>>> And by "I wasn't clear" I meant "I misread the options".
>>>
>>> On Fri, Nov 8, 2019, 4:14 PM Robert Burke  wrote:
>>>
 Reading back, I wasn't clear: the Go SDK does Option (1), putting the
 LP explicitly during encoding [1] for the runner proto, and explicitly
 expects LPs to contain a custom coder URN on decode for execution [2].
 (Modulo an old bug in Dataflow where the urn was empty)


 [1]
 https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L348
 [2]
 https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L219


 On Fri, Nov 8, 2019, 10:28 AM Robert Bradshaw 
 wrote:

> On Fri, Nov 8, 2019 at 2:09 AM jincheng sun 
> wrote:
> >
> > Hi,
> >
> > Sorry for my late reply. It seems the conclusion has been reached. I
> just want to share my personal thoughts.
> >
> > Generally, both option 1 and 3 make sense to me.
> >
> > >> The key concept here is not "standard coder" but "coder that the
> > >> runner does not understand." This knowledge is only in the runner.
> > >> Also has the downside of (2).
> >
> > >Yes, I had assumed "non-standard" and "unknown" are the same, but
> the
> > >latter can be a subset of the former, i.e. if a Runner does not
> support
> > >all of the standard coders for some reason.
> >
> > I'm also assume that "non-standard" and "unknown" are the same.
> Currently, in the runner side[1] it
> > decides whether the coder is unknown(wrap with length prefix coder)
> according to whether the coder is among
> > the standard coders. It will not communicate with harness to make
> this decision.
> >
> > So, from my point of view, we can update the PR according to option
> 1 

Re: Key encodings for state requests

2019-11-08 Thread jincheng sun
> Let us first define what are "standard coders". Usually it should be the
coders defined in the Proto. However, personally I think the coders defined
in the Java ModelCoders [1] seems more appropriate. The reason is that for
a coder which has already appeared in Proto and still not added to the Java
ModelCoders, it's always replaced by the runner with
LengthPrefixCoder[ByteArrayCoder] and so the SDK harness will decode the
data with LengthPrefixCoder[ByteArrayCoder] instead of the actual coder.
That's to say, that coder does not still take effect in the SDK harness.
Only when the coder is added in ModelCoders, it's 'known' and will take
effect.

Correct this point! The coder which is not contained in the Java
ModelCoders is replaced with LengthPrefixCoder[ByteArrayCoder] at runner
side and LengthPrefixCoder[CustomCoder] at SDK harness side.

The point here is that the runner determines whether it knows the coder
according to the coders defined in the Java ModelCoders, not the coders
defined in the proto file. So if taking option 3, the non-standard coders
which will be wrapped with LengthPrefixCoder should also be determined by
the coders defined in the Java ModerCoders, not the coders defined in the
proto file.

jincheng sun 于2019年11月9日 周六12:26写道:

> Hi Robert Bradshaw,
>
> Thanks a lot for the explanation. Very interesting topic!
>
> Let us first define what are "standard coders". Usually it should be the
> coders defined in the Proto. However, personally I think the coders defined
> in the Java ModelCoders [1] seems more appropriate. The reason is that for
> a coder which has already appeared in Proto and still not added to the Java
> ModelCoders, it's always replaced by the runner with
> LengthPrefixCoder[ByteArrayCoder] and so the SDK harness will decode the
> data with LengthPrefixCoder[ByteArrayCoder] instead of the actual coder.
> That's to say, that coder does not still take effect in the SDK harness.
> Only when the coder is added in ModelCoders, it's 'known' and will take
> effect.
>
> So if we take option 3, the non-standard coders which will be wrapped with
> LengthPrefixCoder should be synced with the coders defined in the Java
> ModerCoders. (From this point of view, option 1 seems more clean!)
>
> Please correct me if I missed something. Thanks a lot!
>
> Best,
> Jincheng
>
> [1]
> https://github.com/apache/beam/blob/01726e9c62313749f9ea7c93063a1178abd1a8db/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java#L59
>
> Robert Burke  于2019年11月9日周六 上午8:46写道:
>
>> And by "I wasn't clear" I meant "I misread the options".
>>
>> On Fri, Nov 8, 2019, 4:14 PM Robert Burke  wrote:
>>
>>> Reading back, I wasn't clear: the Go SDK does Option (1), putting the LP
>>> explicitly during encoding [1] for the runner proto, and explicitly expects
>>> LPs to contain a custom coder URN on decode for execution [2]. (Modulo an
>>> old bug in Dataflow where the urn was empty)
>>>
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L348
>>> [2]
>>> https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L219
>>>
>>>
>>> On Fri, Nov 8, 2019, 10:28 AM Robert Bradshaw 
>>> wrote:
>>>
 On Fri, Nov 8, 2019 at 2:09 AM jincheng sun 
 wrote:
 >
 > Hi,
 >
 > Sorry for my late reply. It seems the conclusion has been reached. I
 just want to share my personal thoughts.
 >
 > Generally, both option 1 and 3 make sense to me.
 >
 > >> The key concept here is not "standard coder" but "coder that the
 > >> runner does not understand." This knowledge is only in the runner.
 > >> Also has the downside of (2).
 >
 > >Yes, I had assumed "non-standard" and "unknown" are the same, but the
 > >latter can be a subset of the former, i.e. if a Runner does not
 support
 > >all of the standard coders for some reason.
 >
 > I'm also assume that "non-standard" and "unknown" are the same.
 Currently, in the runner side[1] it
 > decides whether the coder is unknown(wrap with length prefix coder)
 according to whether the coder is among
 > the standard coders. It will not communicate with harness to make
 this decision.
 >
 > So, from my point of view, we can update the PR according to option 1
 or 3.
 >
 > [1]
 https://github.com/apache/beam/blob/66a67e6580b93906038b31ae7070204cec90999c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62

 That list is populated in Java code [1] and has typically been a
 subset of what is in the proto file. Things like StringUtf8Coder and
 DoubleCoder have been added at different times to different SDKs and
 Runners, sometimes long after the URN is in the proto. Having to keep
 this list synchronized 

Re: Key encodings for state requests

2019-11-08 Thread jincheng sun
Hi Robert Bradshaw,

Thanks a lot for the explanation. Very interesting topic!

Let us first define what are "standard coders". Usually it should be the
coders defined in the Proto. However, personally I think the coders defined
in the Java ModelCoders [1] seems more appropriate. The reason is that for
a coder which has already appeared in Proto and still not added to the Java
ModelCoders, it's always replaced by the runner with
LengthPrefixCoder[ByteArrayCoder] and so the SDK harness will decode the
data with LengthPrefixCoder[ByteArrayCoder] instead of the actual coder.
That's to say, that coder does not still take effect in the SDK harness.
Only when the coder is added in ModelCoders, it's 'known' and will take
effect.

So if we take option 3, the non-standard coders which will be wrapped with
LengthPrefixCoder should be synced with the coders defined in the Java
ModerCoders. (From this point of view, option 1 seems more clean!)

Please correct me if I missed something. Thanks a lot!

Best,
Jincheng

[1]
https://github.com/apache/beam/blob/01726e9c62313749f9ea7c93063a1178abd1a8db/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java#L59

Robert Burke  于2019年11月9日周六 上午8:46写道:

> And by "I wasn't clear" I meant "I misread the options".
>
> On Fri, Nov 8, 2019, 4:14 PM Robert Burke  wrote:
>
>> Reading back, I wasn't clear: the Go SDK does Option (1), putting the LP
>> explicitly during encoding [1] for the runner proto, and explicitly expects
>> LPs to contain a custom coder URN on decode for execution [2]. (Modulo an
>> old bug in Dataflow where the urn was empty)
>>
>>
>> [1]
>> https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L348
>> [2]
>> https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L219
>>
>>
>> On Fri, Nov 8, 2019, 10:28 AM Robert Bradshaw 
>> wrote:
>>
>>> On Fri, Nov 8, 2019 at 2:09 AM jincheng sun 
>>> wrote:
>>> >
>>> > Hi,
>>> >
>>> > Sorry for my late reply. It seems the conclusion has been reached. I
>>> just want to share my personal thoughts.
>>> >
>>> > Generally, both option 1 and 3 make sense to me.
>>> >
>>> > >> The key concept here is not "standard coder" but "coder that the
>>> > >> runner does not understand." This knowledge is only in the runner.
>>> > >> Also has the downside of (2).
>>> >
>>> > >Yes, I had assumed "non-standard" and "unknown" are the same, but the
>>> > >latter can be a subset of the former, i.e. if a Runner does not
>>> support
>>> > >all of the standard coders for some reason.
>>> >
>>> > I'm also assume that "non-standard" and "unknown" are the same.
>>> Currently, in the runner side[1] it
>>> > decides whether the coder is unknown(wrap with length prefix coder)
>>> according to whether the coder is among
>>> > the standard coders. It will not communicate with harness to make this
>>> decision.
>>> >
>>> > So, from my point of view, we can update the PR according to option 1
>>> or 3.
>>> >
>>> > [1]
>>> https://github.com/apache/beam/blob/66a67e6580b93906038b31ae7070204cec90999c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62
>>>
>>> That list is populated in Java code [1] and has typically been a
>>> subset of what is in the proto file. Things like StringUtf8Coder and
>>> DoubleCoder have been added at different times to different SDKs and
>>> Runners, sometimes long after the URN is in the proto. Having to keep
>>> this list synchronized (and versioned) would be a regression.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/release-2.17.0/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
>>>
>>> The PR taking approach (1) looks good at a first glance (I see others
>>> are reviewing it). Thanks.
>>>
>>> > Maximilian Michels  于2019年11月8日周五 上午3:35写道:
>>> >>
>>> >> > While the Go SDK doesn't yet support a State API, Option 3) is what
>>> the Go SDK does for all non-standard coders (aka custom coders) anyway.
>>> >>
>>> >> For wire transfer, the Java Runner also adds a LengthPrefixCoder for
>>> the
>>> >> coder and its subcomponents. The problem is that this is an implicit
>>> >> assumption made. In the Proto, we do not have this represented. This
>>> is
>>> >> why **for state requests**, we end up with a
>>> >> "LengthPrefixCoder[CustomCoder]" on the Runner and a "CustomCoder" on
>>> >> the SDK Harness side. Note that the Python Harness does wrap unknown
>>> >> coders in a LengthPrefixCoder for transferring regular elements, but
>>> the
>>> >> LengthPrefixCoder is not preserved for the state requests.
>>> >>
>>> >> In that sense (3) is good because it follows this implicit notion of
>>> >> adding a LengthPrefixCoder for wire transfer, but applies it to state
>>> >> requests.
>>> >>
>>> >> However, option (1) is most reliable because the 

Re: Deprecate some or all of TestPipelineOptions?

2019-11-08 Thread Brian Hulette
Thanks everyone for the responses. I put up a WIP PR [1] that removes
OnCreateMatcher and OnSuccessMatcher.

[1] https://github.com/apache/beam/pull/10056

On Fri, Nov 8, 2019 at 9:48 AM Luke Cwik  wrote:

> It can be marked as deprecated and we can remove its usage everywhere but
> leave this interface and mark it for removal at some future time.
>
> On Thu, Nov 7, 2019 at 2:23 PM Ismaël Mejía  wrote:
>
>> Thanks for bringing this to the ML Brian
>>
>> +1 For full TestPipelineOptions deprecation. Even worth to remove it,
>> bad part is that this class resides in 'sdks/core/main/java' and not
>> in testing as I imagined so this could count as a 'breaking' change.
>>
>> On Thu, Nov 7, 2019 at 8:27 PM Luke Cwik  wrote:
>> >
>> > There was issue with asynchrony of p.run(), some runners blocked till
>> the pipeline was complete with p.run() which was never meant to be the
>> intent.
>> >
>> > The test timeout one makes sense to be able to configure it per runner
>> (since Dataflow takes a lot longer than other runners) but we may be able
>> to configure a Junit test timeout attribute instead.
>> >
>> > I would be for getting rid of them.
>> >
>> >
>> > On Wed, Nov 6, 2019 at 3:36 PM Robert Bradshaw 
>> wrote:
>> >>
>> >> +1 to all of these are probably obsolete at this point and would be
>> >> nice to remove.
>> >>
>> >>
>> >> On Wed, Nov 6, 2019 at 3:00 PM Kenneth Knowles 
>> wrote:
>> >> >
>> >> > Good find. I think TestPipelineOptions is from very early days. It
>> makes sense to me that these are all obsolete. Some guesses, though I
>> haven't dug through commit history to confirm:
>> >> >
>> >> >  - TempRoot: a while ago TempLocation was optional, so I think this
>> would provide a default for things like gcpTempLocation and stagingLocation
>> >> >  - OnSuccessMatcher: for runners where pipeline used to not
>> terminate in streaming mode. Now I think every runner can successfully
>> waitUntilFinish. Also the current API for waitUntilFinish went through some
>> evolutions around asynchrony so it wasn't always a good choice.
>> >> >  - OnCreateMatcher: just for symmetry? I don't know
>> >> >  - TestTimeoutSeconds: probably also for the
>> asychrony/waitUntilfinish issue
>> >> >
>> >> > Kenn
>> >> >
>> >> > On Wed, Nov 6, 2019 at 12:19 PM Brian Hulette 
>> wrote:
>> >> >>
>> >> >> I recently came across TestPipelineOptions, and now I'm wondering
>> if maybe it should be deprecated. It only seems to actually be supported
>> for Spark and Dataflow (via TestSparkRunner and TestDataflowRunner), and I
>> think it may make more sense to move the functionality it provides into the
>> tests that need it.
>> >> >>
>> >> >> TestPipelineOptions currently has four attributes:
>> >> >>
>> >> >> # TempRoot
>> >> >> It's purpose isn't documented, but many tests read TempRoot and use
>> it to set a TempLocation (example). I think this attribute makes sense
>> (e.g. we can set TempRoot once and each test has its own subdirectory), but
>> I'm not sure. Can anyone confirm the motivation for it? I'd like to at
>> least add a docstring for it.
>> >> >>
>> >> >> # OnCreateMatcher
>> >> >> A way to register a matcher that will be checked right after a
>> pipeline has started. It's never set except for in TestDataflowRunnerTest,
>> so I think this is absolutely safe to remove.
>> >> >>
>> >> >> # OnSuccessMatcher
>> >> >> A way to register a matcher that will be checked right after a
>> pipeline has successfully completed. This is used in several tests
>> (RequiresStableInputIT, WordCountIT, ... 8 total occurrences), but I don't
>> see why they couldn't all be replaced with a `p.run().waitUntilFinish()`,
>> followed by an assert.
>> >> >>
>> >> >> I think the current approach is actually dangerous, because running
>> these tests with runners other than TestDataflowRunner or TestSparkRunner
>> means the matchers are never actually checked. This is actually how I came
>> across TestPipelineOptions - I tried running a test with the DirectRunner
>> and couldn't make it fail.
>> >> >>
>> >> >> # TestTimeoutSeconds
>> >> >> Seems to just be a wrapper for `waitUntilFinish(duration)`, and
>> only used in one place. I think it would be cleaner for the test to be
>> responsible for calling waitUntilFinish (which we do elsewhere), the only
>> drawback is it requires a small refactor so the test has access to the
>> PipelineResult object.
>> >> >>
>> >> >>
>> >> >> So I have a couple of questions for the community
>> >> >> 1) Are there thoughts on TempRoot? Can we get rid of it?
>> >> >> 2) Are there any objections to removing the other three attributes?
>> Am I missing something? Unless there are any objections I think I'll write
>> a patch to remove them.
>> >> >>
>> >> >> Thanks,
>> >> >> Brian
>>
>


Re: The state of external transforms in Beam

2019-11-08 Thread Chamikara Jayalath
Send https://github.com/apache/beam/pull/10054 to update the roadmap.

Thanks,
Cham

On Mon, Nov 4, 2019 at 10:24 AM Chamikara Jayalath 
wrote:

> Makes sense.
>
> I can look into expanding on what we have at following location and adding
> links to some of the existing work as a first step.
> https://beam.apache.org/roadmap/connectors-multi-sdk/
>
> Created https://issues.apache.org/jira/browse/BEAM-8553
>
> We also need more detailed documentation for cross-language transforms but
> that can be separate (and hopefully with help from tech writers who have
> been helping with Beam documentation in general).
>
> Thanks,
> Cham
>
>
> On Sun, Nov 3, 2019 at 7:16 PM Thomas Weise  wrote:
>
>> This thread was very helpful to find more detail in
>> https://jira.apache.org/jira/browse/BEAM-7870
>>
>> It would be great to have cross-language current state mentioned as top
>> level entry on https://beam.apache.org/roadmap/
>>
>>
>> On Mon, Sep 16, 2019 at 6:07 PM Chamikara Jayalath 
>> wrote:
>>
>>> Thanks for the nice write up Chad.
>>>
>>> On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw 
>>> wrote:
>>>
 Thanks for bringing this up again. My thoughts on the open questions
 below.

 On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova 
 wrote:
 > That commit solves 2 problems:
 >
 > Adds the pubsub Java deps so that they’re available in our portable
 pipeline
 > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
 available as a standard coder. This is required because both PubsubIO.Read
 and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
 objects, but only “standard” (i.e. portable) coders can be used, so we have
 to hack it to make PubsubMessage appear as a standard coder.
 >
 > More details:
 >
 > There’s a similar magic commit required for Kafka external transforms
 > The Jira issue for this problem is here:
 https://jira.apache.org/jira/browse/BEAM-7870
 > For problem #2 above there seems to be some consensus forming around
 using Avro or schema/row coders to send compound types in a portable way.
 Here’s the PR for making row coders portable
 > https://github.com/apache/beam/pull/9188

 +1. Note that this doesn't mean that the IO itself must produce rows;
 part of the Schema work in Java is to make it easy to automatically
 convert from various Java classes to schemas transparently, so this
 same logic that would allow one to apply an SQL filter directly to a
 Kafka/PubSub read would allow cross-language. Even if that doesn't
 work, we need not uglify the Java API; we can have an
 option/alternative transform that appends the convert-to-Row DoFn for
 easier use by external (though the goal of the former work is to make
 this step unnecissary).

>>>
>>> Updating all IO connectors / transforms to have a version that
>>> produces/consumes a PCollection is infeasible so I agree that we need
>>> an automatic conversion to/from PCollection possibly by injecting
>>> PTransfroms during ExternalTransform expansion.
>>>

 > I don’t really have any ideas for problem #1

 The crux of the issue here is that the jobs API was not designed with
 cross-language in mind, and so the artifact API ties artifacts to jobs
 rather than to environments. To solve this we need to augment the
 notion of environment to allow the specification of additional
 dependencies (e.g. jar files in this specific case, or better as
 maven/pypi/... dependencies (with version ranges) such that
 environment merging and dependency resolution can be sanely done), and
 a way for the expansion service to provide such dependencies.

 Max wrote up a summary of the prior discussions at

 https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8

 In the short term, one can build a custom docker image that has all
 the requisite dependencies installed.

 This touches on a related but separable issue that one may want to run
 some of these transforms "natively" in the same process as the runner
 (e.g. a Java IO in the Flink Java Runner) rather than via docker.
 (Similarly with subprocess.) Exactly how that works with environment
 specifications is also a bit TBD, but my proposal has been that these
 are best viewed as runner-specific substitutions of standard
 environments.

>>>
>>> We need a permanent solution for this but for now we have a temporary
>>> solution where additional jar files can be specified through an experiment
>>> when running a Python pipeline:
>>> https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55
>>>
>>> Thanks,
>>> Cham
>>>
>>>

 > So the portability expansion system works, and now it’s time to sand
 off some of 

Re: Python Precommit duration pushing 2 hours

2019-11-08 Thread Ahmet Altay
I looked at the log but I could not figure what is causing the timeout
because the gradle scan links are missing. I sampled a few of the
successful jobs, It seems like python 3.7 and python 2 are running 3 tests
in serial {interactive, py37cython, py37gcp} and {docs, py27cython,
py27gcp} respectively. These two versions are pushing the total time
because other variants are now only running {cython, gcp} versions.

I suggest breaking up docs, and interactive into 2 separate suites of their
own. docs is actually faster than interactive,just separating that out to a
new suite might help.

Interactive was recently added
(https://github.com/apache/beam/pull/9741). +Ning
Kang  could you separate interactive to new suite?

Ahmet

On Fri, Nov 8, 2019 at 11:09 AM Robert Bradshaw  wrote:

> Just saw another 2-hour timeout:
> https://builds.apache.org/job/beam_PreCommit_Python_Commit/9440/ , so
> perhaps we're not out of the woods yet (though in general things have
> been a lot better).
>
> On Tue, Nov 5, 2019 at 10:52 AM Ahmet Altay  wrote:
> >
> > GCP tests are already on separate locations. IO related tests are under
> /sdks/python/apache_beam/io/gcp and Dataflow related tests are under
> sdks/python/apache_beam/runners/dataflow. It should be a matter of changing
> gradle files to run either one of the base tests or GCP tests depending on
> the types of changes. I do not expect this to have any material impact on
> the precommit times because these two test suites take about exactly the
> same time to complete.
> >
> > #9985 is merged now. Precommit times on master branch dropped to ~1h 20
> for the last 5 runs.
> >
> > On Tue, Nov 5, 2019 at 10:12 AM David Cavazos 
> wrote:
> >>
> >> +1 to moving the GCP tests outside of core. If there are issues that
> only show up on GCP tests but not in core, it might be an indication that
> there needs to be another test in core covering that, but I think that
> should be pretty rare.
> >>
> >> On Mon, Nov 4, 2019 at 8:33 PM Kenneth Knowles  wrote:
> >>>
> >>> +1 to moving forward with this
> >>>
> >>> Could we move GCP tests outside the core? Then only code changes
> touches/affecting GCP would cause them to run in precommit. Could still run
> them in postcommit in their own suite. If the core has reasonably stable
> abstractions that the connectors are built on, this should not change
> coverage much.
> >>>
> >>> Kenn
> >>>
> >>> On Mon, Nov 4, 2019 at 1:55 PM Ahmet Altay  wrote:
> 
>  PR for the proposed change: https://github.com/apache/beam/pull/9985
> 
>  On Mon, Nov 4, 2019 at 1:35 PM Udi Meiri  wrote:
> >
> > +1
> >
> > On Mon, Nov 4, 2019 at 12:09 PM Robert Bradshaw 
> wrote:
> >>
> >> +1, this seems like a good step with a clear win.
> >>
> >> On Mon, Nov 4, 2019 at 12:06 PM Ahmet Altay 
> wrote:
> >> >
> >> > Python precommits are still timing out on #9925. I am guessing
> that means this change would not be enough.
> >> >
> >> > I am proposing cutting down the number of test variants we run in
> precommits. Currently for each version we ran the following variants
> serially:
> >> > - base: Runs all unit tests with tox
> >> > - Cython: Installs cython and runs all unit tests as base
> version. The original purpose was to ensure that tests pass with or without
> cython. There is probably a huge overlap with base. (IIRC only a few coders
> have different slow vs fast tests.)
> >> > - GCP: Installs GCP dependencies and tests all base + additional
> gcp specific tests. The original purpose was to ensure that GCP is an
> optional component and all non-GCP tests still works without GCP components.
> >> >
> >> > We can reduce the list to cython + GCP tests only. This will
> cover the same group of tests and will check that tests pass with or
> without cython or GCP dependencies. This could reduce the precommit time by
> ~30 minutes.
> >> >
> >> > What do you think?
> >> >
> >> > Ahmet
> >> >
> >> >
> >> > On Tue, Oct 29, 2019 at 11:15 AM Robert Bradshaw <
> rober...@google.com> wrote:
> >> >>
> >> >> https://github.com/apache/beam/pull/9925
> >> >>
> >> >> On Tue, Oct 29, 2019 at 10:24 AM Udi Meiri 
> wrote:
> >> >> >
> >> >> > I don't have the bandwidth right now to tackle this. Feel free
> to take it.
> >> >> >
> >> >> > On Tue, Oct 29, 2019 at 10:16 AM Robert Bradshaw <
> rober...@google.com> wrote:
> >> >> >>
> >> >> >> The Python SDK does as well. These calls are coming from
> >> >> >> to_runner_api, is_stateful_dofn, and validate_stateful_dofn
> which are
> >> >> >> invoked once per pipene or bundle. They are, however,
> surprisingly
> >> >> >> expensive. Even memoizing across those three calls should
> save a
> >> >> >> significant amount of time. Udi, did you want to tackle this?
> >> >> >>
> >> >> >> Looking at the profile, Pipeline.to_runner_api() is being
> called 30
> >> >> >> times 

Revamping the cross-language validate runner test suite

2019-11-08 Thread Heejong Lee
Hi,

I'm working on revamping the cross-language validate runner test suite. Our
current test suite for the cross-language transform is incomplete as it
only has tests for Wordcount, DoFn, basic Count and basic Filter
transforms. My plan is, in addition to our existing set of tests, to add
all primitive transforms to the suite like GroupByKey, CoGroupByKey,
Combine, Flatten and Partition.

The link for the design doc is attached. Please feel free to comment.

https://docs.google.com/document/d/1xQp0ElIV84b8OCVz8CD2hvbiWdR8w4BvWxPTZJZA6NA/edit?usp=sharing


Re: Key encodings for state requests

2019-11-08 Thread Robert Burke
And by "I wasn't clear" I meant "I misread the options".

On Fri, Nov 8, 2019, 4:14 PM Robert Burke  wrote:

> Reading back, I wasn't clear: the Go SDK does Option (1), putting the LP
> explicitly during encoding [1] for the runner proto, and explicitly expects
> LPs to contain a custom coder URN on decode for execution [2]. (Modulo an
> old bug in Dataflow where the urn was empty)
>
>
> [1]
> https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L348
> [2]
> https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L219
>
>
> On Fri, Nov 8, 2019, 10:28 AM Robert Bradshaw  wrote:
>
>> On Fri, Nov 8, 2019 at 2:09 AM jincheng sun 
>> wrote:
>> >
>> > Hi,
>> >
>> > Sorry for my late reply. It seems the conclusion has been reached. I
>> just want to share my personal thoughts.
>> >
>> > Generally, both option 1 and 3 make sense to me.
>> >
>> > >> The key concept here is not "standard coder" but "coder that the
>> > >> runner does not understand." This knowledge is only in the runner.
>> > >> Also has the downside of (2).
>> >
>> > >Yes, I had assumed "non-standard" and "unknown" are the same, but the
>> > >latter can be a subset of the former, i.e. if a Runner does not support
>> > >all of the standard coders for some reason.
>> >
>> > I'm also assume that "non-standard" and "unknown" are the same.
>> Currently, in the runner side[1] it
>> > decides whether the coder is unknown(wrap with length prefix coder)
>> according to whether the coder is among
>> > the standard coders. It will not communicate with harness to make this
>> decision.
>> >
>> > So, from my point of view, we can update the PR according to option 1
>> or 3.
>> >
>> > [1]
>> https://github.com/apache/beam/blob/66a67e6580b93906038b31ae7070204cec90999c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62
>>
>> That list is populated in Java code [1] and has typically been a
>> subset of what is in the proto file. Things like StringUtf8Coder and
>> DoubleCoder have been added at different times to different SDKs and
>> Runners, sometimes long after the URN is in the proto. Having to keep
>> this list synchronized (and versioned) would be a regression.
>>
>> [1]
>> https://github.com/apache/beam/blob/release-2.17.0/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
>>
>> The PR taking approach (1) looks good at a first glance (I see others
>> are reviewing it). Thanks.
>>
>> > Maximilian Michels  于2019年11月8日周五 上午3:35写道:
>> >>
>> >> > While the Go SDK doesn't yet support a State API, Option 3) is what
>> the Go SDK does for all non-standard coders (aka custom coders) anyway.
>> >>
>> >> For wire transfer, the Java Runner also adds a LengthPrefixCoder for
>> the
>> >> coder and its subcomponents. The problem is that this is an implicit
>> >> assumption made. In the Proto, we do not have this represented. This is
>> >> why **for state requests**, we end up with a
>> >> "LengthPrefixCoder[CustomCoder]" on the Runner and a "CustomCoder" on
>> >> the SDK Harness side. Note that the Python Harness does wrap unknown
>> >> coders in a LengthPrefixCoder for transferring regular elements, but
>> the
>> >> LengthPrefixCoder is not preserved for the state requests.
>> >>
>> >> In that sense (3) is good because it follows this implicit notion of
>> >> adding a LengthPrefixCoder for wire transfer, but applies it to state
>> >> requests.
>> >>
>> >> However, option (1) is most reliable because the LengthPrefixCoder is
>> >> actually in the Proto. So "CustomCoder" will always be represented as
>> >> "LengthPrefixCoder[CustomCoder]", and only standard coders will be
>> added
>> >> without a LengthPrefixCoder.
>> >>
>> >> > I'd really like to avoid implicit agreements about how the coder that
>> >> > should be used differs from what's specified in the proto in
>> different
>> >> > contexts.
>> >>
>> >> Option (2) would work on top of the existing logic because replacing a
>> >> non-standard coder with a "NOOP coder" would just be used by the Runner
>> >> to produce a serialized version of the key for partitioning. Flink
>> >> always operates on the serialized key, be it standard or non-standard
>> >> coder. It wouldn't be necessary to change any of the existing wire
>> >> transfer logic or representation. I understand that it would be less
>> >> ideal, but maybe easier to fix for the release.
>> >>
>> >> > The key concept here is not "standard coder" but "coder that the
>> >> > runner does not understand." This knowledge is only in the runner.
>> >> > Also has the downside of (2).
>> >>
>> >> Yes, I had assumed "non-standard" and "unknown" are the same, but the
>> >> latter can be a subset of the former, i.e. if a Runner does not support
>> >> all of the standard coders for some reason.
>> >>
>> >> > This means that 

Re: Key encodings for state requests

2019-11-08 Thread Robert Burke
Reading back, I wasn't clear: the Go SDK does Option (1), putting the LP
explicitly during encoding [1] for the runner proto, and explicitly expects
LPs to contain a custom coder URN on decode for execution [2]. (Modulo an
old bug in Dataflow where the urn was empty)


[1]
https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L348
[2]
https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L219


On Fri, Nov 8, 2019, 10:28 AM Robert Bradshaw  wrote:

> On Fri, Nov 8, 2019 at 2:09 AM jincheng sun 
> wrote:
> >
> > Hi,
> >
> > Sorry for my late reply. It seems the conclusion has been reached. I
> just want to share my personal thoughts.
> >
> > Generally, both option 1 and 3 make sense to me.
> >
> > >> The key concept here is not "standard coder" but "coder that the
> > >> runner does not understand." This knowledge is only in the runner.
> > >> Also has the downside of (2).
> >
> > >Yes, I had assumed "non-standard" and "unknown" are the same, but the
> > >latter can be a subset of the former, i.e. if a Runner does not support
> > >all of the standard coders for some reason.
> >
> > I'm also assume that "non-standard" and "unknown" are the same.
> Currently, in the runner side[1] it
> > decides whether the coder is unknown(wrap with length prefix coder)
> according to whether the coder is among
> > the standard coders. It will not communicate with harness to make this
> decision.
> >
> > So, from my point of view, we can update the PR according to option 1 or
> 3.
> >
> > [1]
> https://github.com/apache/beam/blob/66a67e6580b93906038b31ae7070204cec90999c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62
>
> That list is populated in Java code [1] and has typically been a
> subset of what is in the proto file. Things like StringUtf8Coder and
> DoubleCoder have been added at different times to different SDKs and
> Runners, sometimes long after the URN is in the proto. Having to keep
> this list synchronized (and versioned) would be a regression.
>
> [1]
> https://github.com/apache/beam/blob/release-2.17.0/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
>
> The PR taking approach (1) looks good at a first glance (I see others
> are reviewing it). Thanks.
>
> > Maximilian Michels  于2019年11月8日周五 上午3:35写道:
> >>
> >> > While the Go SDK doesn't yet support a State API, Option 3) is what
> the Go SDK does for all non-standard coders (aka custom coders) anyway.
> >>
> >> For wire transfer, the Java Runner also adds a LengthPrefixCoder for the
> >> coder and its subcomponents. The problem is that this is an implicit
> >> assumption made. In the Proto, we do not have this represented. This is
> >> why **for state requests**, we end up with a
> >> "LengthPrefixCoder[CustomCoder]" on the Runner and a "CustomCoder" on
> >> the SDK Harness side. Note that the Python Harness does wrap unknown
> >> coders in a LengthPrefixCoder for transferring regular elements, but the
> >> LengthPrefixCoder is not preserved for the state requests.
> >>
> >> In that sense (3) is good because it follows this implicit notion of
> >> adding a LengthPrefixCoder for wire transfer, but applies it to state
> >> requests.
> >>
> >> However, option (1) is most reliable because the LengthPrefixCoder is
> >> actually in the Proto. So "CustomCoder" will always be represented as
> >> "LengthPrefixCoder[CustomCoder]", and only standard coders will be added
> >> without a LengthPrefixCoder.
> >>
> >> > I'd really like to avoid implicit agreements about how the coder that
> >> > should be used differs from what's specified in the proto in different
> >> > contexts.
> >>
> >> Option (2) would work on top of the existing logic because replacing a
> >> non-standard coder with a "NOOP coder" would just be used by the Runner
> >> to produce a serialized version of the key for partitioning. Flink
> >> always operates on the serialized key, be it standard or non-standard
> >> coder. It wouldn't be necessary to change any of the existing wire
> >> transfer logic or representation. I understand that it would be less
> >> ideal, but maybe easier to fix for the release.
> >>
> >> > The key concept here is not "standard coder" but "coder that the
> >> > runner does not understand." This knowledge is only in the runner.
> >> > Also has the downside of (2).
> >>
> >> Yes, I had assumed "non-standard" and "unknown" are the same, but the
> >> latter can be a subset of the former, i.e. if a Runner does not support
> >> all of the standard coders for some reason.
> >>
> >> > This means that the wire format that the runner sends for the "key"
> represents the exact same wire format it will receive for state requests.
> >>
> >> The wire format for the entire element is the same. Otherwise we
> >> wouldn't be able to 

Re: Questions about the current and future design of the job service message stream

2019-11-08 Thread Luke Cwik
+Daniel Mills  for usability in job messages / logging
integration across Beam runners.

On Wed, Nov 6, 2019 at 10:30 AM Chad Dombrova  wrote:

> Hi all,
> I’ve been working lately on improving the state stream and message stream
> on the job service (links to issues and PRs below), and I’m somewhat
> confused by the inclusion of states in the message stream, since there’s a
> separate dedicated state stream for that. Here’s the proto for the message
> response:
>
> message GetJobStateResponse {  JobState.Enum state = 1; // (required)
> }
> message JobMessage {
>   string message_id = 1;
>   string time = 2;  MessageImportance importance = 3;
>   string message_text = 4;
>
>   enum MessageImportance {MESSAGE_IMPORTANCE_UNSPECIFIED = 0;
> JOB_MESSAGE_DEBUG = 1;JOB_MESSAGE_DETAILED = 2;JOB_MESSAGE_BASIC = 3; 
>JOB_MESSAGE_WARNING = 4;JOB_MESSAGE_ERROR = 5;
>   }
> }
> message JobMessagesResponse {
>   oneof response {JobMessage message_response = 1;GetJobStateResponse 
> state_response = 2;
>   }
> }
>
> You can see that each JobMessagesResponse may contain a message *or* a
> GetJobStateResponse.
>
> What’s the intention behind this design?
>
I believe this was because a user may want to listen to both job state and
messages all in one stream.

> The main benefit I see is that it’s easier to ensure that the state and
> message logs are properly ordered. For example, in the code below it’s
> unclear at a glance (at least to me) whether we’d need to use locking
> between the main thread and read_messages thread if the main thread were
> made solely responsible for logging state messages:
>
>   def wait_until_finish(self):
>
> def read_messages():
>   for message in self._message_stream:
> if message.HasField('message_response'):
>   logging.log(
>   MESSAGE_LOG_LEVELS[message.message_response.importance],
>   "%s",
>   message.message_response.message_text)
> else:
>   logging.info(
>   "Job state changed to %s",
>   self._runner_api_state_to_pipeline_state(
>   message.state_response.state))
> self._messages.append(message)
>
> t = threading.Thread(target=read_messages, name='wait_until_finish_read')
> t.daemon = True
> t.start()
>
> try:
>   for state_response in self._state_stream:
> self._state = self._runner_api_state_to_pipeline_state(
> state_response.state)
> if state_response.state in TERMINAL_STATES:
>   # Wait for any last messages.
>   t.join(10)
>   break
>   if self._state != runner.PipelineState.DONE:
> raise RuntimeError(
> 'Pipeline %s failed in state %s: %s' % (
> self._job_id, self._state, self._last_error_message()))
>   return self._state
> finally:
>   self._cleanup()
>
> The reason this is important to me is I’d like to make a handful of
> changes to GetMessageStream to make it more powerful:
>
>- propagate messages from user code (if they opt in to setting up
>their logger appropriately). currently, AFAICT, the only message the
>message stream delivers is a final error, if the job fails (other than
>state changes). It was clearly the original intent of this endpoint to
>carry other types of messages, and I'd like to bring that to fruition.
>
> Log messages is a lot of data, we do have users writing GBs/s when
aggregated across all their machines in Google Cloud so not sure if this
will scale without a lot of control on filtering. Users sometimes don't
recognize how much they are logging and if you have a 1000 VMs each writing
only a few lines at a time you can easily saturate this stream.

>
>- make it possible to backfill log messages when a client connection
>is made (limited by a min timestamp and/or max number of messages).  so if
>a client connects late it can still easily catch up with a limited amount
>of recent activity.
>
> +1

>
>- make it possible to back GetMessageStream with logging services like
>StackDriver, CloudWatch, or Elasticsearch
>
> That is interesting, originally the message stream was designed around
system messages from the runner and not specifically around users log
messages due to volume concerns. All logging integration to my knowledge
has been deferred to the client libraries for those specific services.

>
>
> Mixing state changes and log messages in the same stream adds some
> wrinkles to this plan, especially for the last one.  The reason is that log
> messages will come primarily from user code, whereas state changes come
> from the runner, and it might require some unwanted abstractions throughout
> the various runners to enable them to deliver state changes to this
> external service, whereas delivering user logs is very straightforward -
> just setup your logging handler.
>
> I’d love to know others' thoughts on what they’d like 

Re: [Discuss] Beam mascot

2019-11-08 Thread Kyle Weaver
Re fish: The authors of the Streaming Systems went with trout, but the book
mentioned a missed opportunity to make their cover a "robot dinosaur with a
Scottish accent." Perhaps that idea is worth revisiting?

On Fri, Nov 8, 2019 at 3:20 PM Luke Cwik  wrote:

> My top suggestion is a cuttlefish.
>
> On Thu, Nov 7, 2019 at 10:28 PM Reza Rokni  wrote:
>
>> Salmon... they love streams? :-)
>>
>> On Fri, 8 Nov 2019 at 12:00, Kenneth Knowles  wrote:
>>
>>> Agree with Aizhamal that it doesn't matter if they are taken if they are
>>> not too close in space to Beam: Apache projects, big data, log processing,
>>> stream processing. Not a legal opinion, but an aesthetic opinion. So I
>>> would keep Lemur as a possibility. Definitely nginx is far away from Beam
>>> so it seems OK as long as the art is different.
>>>
>>> Also FWIW there are many kinds of Lemurs, and also related Tarsier, of
>>> the only uncontroversial and non-extinct infraorder within
>>> suborder Strepsirrhini. I think there's enough room for another mascot with
>>> big glowing eyes :-). The difference in the designer's art will be more
>>> significant than the taxonomy.
>>>
>>> Kenn
>>>
>>> On Tue, Nov 5, 2019 at 4:37 PM Aizhamal Nurmamat kyzy <
>>> aizha...@apache.org> wrote:
>>>
 Aww.. that Hoover beaver is cute. But then lemur is also "taken" [1]
 and the owl too [2].

 Personally, I don't think it matters much which mascots are taken, as
 long as the project is not too close in the same space as Beam. Also, it's
 good to just get all ideas out. We should still consider hedgehogs. I
 looked up fireflies, they don't look nice, but i am not dismissing the idea
 :/

 And thanks for reaching out to designers, Max. To your point:
 >how do we arrive at a concrete design
 >once we have consensus on the type of mascot?
 My thinking is that the designer will come up with few sketches, then
 we vote on one here in the dev@ list.

 [1]
 https://www.nginx.com/blog/introducing-the-lemur-stack-and-an-official-nginx-mascot/
 [2] https://blog.readme.io/why-every-startup-needs-a-mascot/

 On Tue, Nov 5, 2019 at 5:31 AM Maximilian Michels 
 wrote:

> Quick update: The mentioned designer has gotten back to me and offered
> to sketch something until the end of the week. I've pointed him to
> this
> thread and the existing logo material:
> https://beam.apache.org/community/logos/
>
> [I don't want to interrupt the discussion in any way, I just think
> having something concrete will help us to eventually decide what we
> want.]
>
> On 05.11.19 12:49, Maximilian Michels wrote:
> > How about fireflies in the Beam light rays? ;)
> >
> >> Feels like "Beam" would go well with an animal that has glowing
> bright
> >> eyes such as a lemur
> >
> > I love the lemur idea because it has almost orange eyes.
> >
> > Thanks for starting this Aizhamal! I've recently talked to a
> designer
> > which is somewhat famous for creating logos. He was inclined to work
> on
> > a software project logo. Of course there is a little bit of a price
> tag
> > attached, though the quote sounded reasonable.
> >
> > It raises the general question, how do we arrive at a concrete
> design
> > once we have consensus on the type of mascot? I believe there are
> also
> > designers working at companies using Beam ;)
> >
> > Cheers,
> > Max
> >
> > On 05.11.19 06:14, Eugene Kirpichov wrote:
> >> Feels like "Beam" would go well with an animal that has glowing
> bright
> >> eyes (with beams of light shooting out of them), such as a lemur
> [1]
> >> or an owl.
> >>
> >> [1] https://www.cnn.com/travel/article/madagascar-lemurs/index.html
> >>
> >> On Mon, Nov 4, 2019 at 7:33 PM Kenneth Knowles  >> > wrote:
> >>
> >> Yes! Let's have a mascot!
> >>
> >> Direct connections often have duplicates. For example in the log
> >> processing space, there is
> https://www.linkedin.com/in/hooverbeaver
> >>
> >> I like a flying squirrel, but Flink already is a squirrel.
> >>
> >> Hedgehog? I could not find any source of confusion for this one.
> >>
> >> Kenn
> >>
> >>
> >> On Mon, Nov 4, 2019 at 6:02 PM Robert Burke  >> > wrote:
> >>
> >> As both a Canadian, and the resident fan of a programming
> >> language with a rodent mascot, I endorse this mascot.
> >>
> >> On Mon, Nov 4, 2019, 4:11 PM David Cavazos <
> dcava...@google.com
> >> > wrote:
> >>
> >> I like it, a beaver could be a cute mascot :)
> >>
> >> On Mon, Nov 4, 2019 at 3:33 PM Aizhamal Nurmamat kyzy
> >> 

Re: [Discuss] Beam mascot

2019-11-08 Thread Luke Cwik
My top suggestion is a cuttlefish.

On Thu, Nov 7, 2019 at 10:28 PM Reza Rokni  wrote:

> Salmon... they love streams? :-)
>
> On Fri, 8 Nov 2019 at 12:00, Kenneth Knowles  wrote:
>
>> Agree with Aizhamal that it doesn't matter if they are taken if they are
>> not too close in space to Beam: Apache projects, big data, log processing,
>> stream processing. Not a legal opinion, but an aesthetic opinion. So I
>> would keep Lemur as a possibility. Definitely nginx is far away from Beam
>> so it seems OK as long as the art is different.
>>
>> Also FWIW there are many kinds of Lemurs, and also related Tarsier, of
>> the only uncontroversial and non-extinct infraorder within
>> suborder Strepsirrhini. I think there's enough room for another mascot with
>> big glowing eyes :-). The difference in the designer's art will be more
>> significant than the taxonomy.
>>
>> Kenn
>>
>> On Tue, Nov 5, 2019 at 4:37 PM Aizhamal Nurmamat kyzy <
>> aizha...@apache.org> wrote:
>>
>>> Aww.. that Hoover beaver is cute. But then lemur is also "taken" [1] and
>>> the owl too [2].
>>>
>>> Personally, I don't think it matters much which mascots are taken, as
>>> long as the project is not too close in the same space as Beam. Also, it's
>>> good to just get all ideas out. We should still consider hedgehogs. I
>>> looked up fireflies, they don't look nice, but i am not dismissing the idea
>>> :/
>>>
>>> And thanks for reaching out to designers, Max. To your point:
>>> >how do we arrive at a concrete design
>>> >once we have consensus on the type of mascot?
>>> My thinking is that the designer will come up with few sketches, then we
>>> vote on one here in the dev@ list.
>>>
>>> [1]
>>> https://www.nginx.com/blog/introducing-the-lemur-stack-and-an-official-nginx-mascot/
>>> [2] https://blog.readme.io/why-every-startup-needs-a-mascot/
>>>
>>> On Tue, Nov 5, 2019 at 5:31 AM Maximilian Michels 
>>> wrote:
>>>
 Quick update: The mentioned designer has gotten back to me and offered
 to sketch something until the end of the week. I've pointed him to this
 thread and the existing logo material:
 https://beam.apache.org/community/logos/

 [I don't want to interrupt the discussion in any way, I just think
 having something concrete will help us to eventually decide what we
 want.]

 On 05.11.19 12:49, Maximilian Michels wrote:
 > How about fireflies in the Beam light rays? ;)
 >
 >> Feels like "Beam" would go well with an animal that has glowing
 bright
 >> eyes such as a lemur
 >
 > I love the lemur idea because it has almost orange eyes.
 >
 > Thanks for starting this Aizhamal! I've recently talked to a designer
 > which is somewhat famous for creating logos. He was inclined to work
 on
 > a software project logo. Of course there is a little bit of a price
 tag
 > attached, though the quote sounded reasonable.
 >
 > It raises the general question, how do we arrive at a concrete design
 > once we have consensus on the type of mascot? I believe there are
 also
 > designers working at companies using Beam ;)
 >
 > Cheers,
 > Max
 >
 > On 05.11.19 06:14, Eugene Kirpichov wrote:
 >> Feels like "Beam" would go well with an animal that has glowing
 bright
 >> eyes (with beams of light shooting out of them), such as a lemur [1]
 >> or an owl.
 >>
 >> [1] https://www.cnn.com/travel/article/madagascar-lemurs/index.html
 >>
 >> On Mon, Nov 4, 2019 at 7:33 PM Kenneth Knowles >>> >> > wrote:
 >>
 >> Yes! Let's have a mascot!
 >>
 >> Direct connections often have duplicates. For example in the log
 >> processing space, there is
 https://www.linkedin.com/in/hooverbeaver
 >>
 >> I like a flying squirrel, but Flink already is a squirrel.
 >>
 >> Hedgehog? I could not find any source of confusion for this one.
 >>
 >> Kenn
 >>
 >>
 >> On Mon, Nov 4, 2019 at 6:02 PM Robert Burke >>> >> > wrote:
 >>
 >> As both a Canadian, and the resident fan of a programming
 >> language with a rodent mascot, I endorse this mascot.
 >>
 >> On Mon, Nov 4, 2019, 4:11 PM David Cavazos <
 dcava...@google.com
 >> > wrote:
 >>
 >> I like it, a beaver could be a cute mascot :)
 >>
 >> On Mon, Nov 4, 2019 at 3:33 PM Aizhamal Nurmamat kyzy
 >> mailto:aizha...@apache.org>>
 wrote:
 >>
 >> Hi everybody,
 >>
 >> I think the idea of creating a Beam mascot has been
 >> brought up a couple times here in the past, but I
 would
 >> like us to go through with it this time if we are
 all in
 >> agreement:)
 >>
 >>

New Contributor

2019-11-08 Thread Yang Zhang
Hello Beam community,

This is Yang from LinkedIn. I am closely working with Xinyu on adopting
Beam SQL in LinkedIn. Can someone add me as a contributor for Beam's Jira
issue tracker? I would like to create/assign tickets for my work. My Jira
ID is *yangzhang*. Thanks!

Best,
Yang


Re: Detecting resources to stage

2019-11-08 Thread Luke Cwik
I believe the closest suggestion[1] we had that worked for Java 11 and
maintained backwards compatibility was to use the URLClassLoader to infer
the resources and if we couldn't do that then look at the java.class.path
system property to do the inference otherwise fail and force the users to
tell us what. There are too many scenarios where we will do it wrong
because of how people package and deploy their code whether it is an
embedded application server or some other application container with a
security manager that will prevent us from doing the right thing.

On Fri, Nov 8, 2019 at 10:31 AM Robert Bradshaw  wrote:

> Note that resources are more properly tied to specific operations and
> stages, not to the entire pipeline. This is especially true in the
> face of libraries (which should have the ability to declare their own
> resources) and cross-language.
>
> On Fri, Nov 8, 2019 at 10:19 AM Łukasz Gajowy  wrote:
> >
> > I figured that it would be good to bump this thread for greater
> visibility even though I don't have a strong opinion about this (yet -
> hopefully, I will know more later to be able to share ;) ).
> >
> > Answering the questions Luke asked will unblock this issue:
> https://issues.apache.org/jira/browse/BEAM-5495. Solving it is needed for
> Java 11 migration (current detecting mechanism does not work with java > 8).
> >
> >
> >>
> >> That said letting the user resolve the jars to stage can be saner
> instead of assuming it is in the classpath/loader. I already have a few
> cases where it will fail cause the transforms load the jars from outside
> the app classloader (transforms are isolated).
> >
> >
> >
> > If I understand correctly, at least in Dataflow runner, if users want to
> provide custom resources to stage, they can use filesToStage pipeline
> option. Once the option is not null, the runner doesn't detect the
> resources automatically and stages resources enlisted in the option
> instead. I think this should be the approach common for all runners (if it
> is not the case already).
>

Your understanding is correct and consistency across runners for a pipeline
option is good for our users.


> >
> > Thanks,
> > Łukasz
> >
> >
> >
>

1: https://github.com/apache/beam/pull/8775


Re: Cython unit test suites running without Cythonized sources

2019-11-08 Thread Robert Bradshaw
On Thu, Nov 7, 2019 at 6:25 PM Chad Dombrova  wrote:
>
> Hi,
> Answers inline below,
>
>>> It's unclear from the nose source[1] whether it's calling build_py and 
>>> build_ext, or just build_ext.  It's also unclear whether the result of that 
>>> build is actually used.  When python setup.py nosetests runs, it runs 
>>> inside of a virtualenv created by tox, and tox has already installed beam 
>>> into that venv.  It seems unlikely to me that build_ext or build_py is 
>>> going to install over top of the beam package installed by tox, but who 
>>> knows, it may end up first on the path.  Also recall that there is an sdist 
>>> gradle task running before tox that creates a tarball that is passed to 
>>> run_tox.sh which passes it along to tox --installpkg flag[2] so that tox 
>>> won't build beam itself.
>>
>>
>> I believe the build step is executed as expected and during installation 
>> results in cythonized package to be installed. This could be verified by, in 
>> a new virtual environment creating a source distribution, installing cython, 
>> then installing the source distribution. Resulting installation does have 
>> the .so files. This is done before running nosetests.
>
>
> Even if it *is* working, I think it's a pretty poor design that we build it 
> once in sdist and then rebuild it again with nose.  It's very obfuscated and 
> brittle, hence we're still debating the probable outcome.  We should choose 
> one place to build and that should either be the sdist gradle task or tox, 
> not the test command.

While "building" is a bit of an odd concept in Python, these steps
have different roles.

The sdist step creates a package that should be installed into each
tox environment. If the tox environment has cython when this apache
beam package is installed, it should be used. Nose (or whatever)
should then run the tests.

I agree this could be cleaned up. I personally don't gradle to execute
any of this stuff so don't remember how it's set up.

>>> We should designate a single place that always does the build.  I thought 
>>> that was supposed to be the gradle sdist task, but invoking nose via 
>>> `python setup.py` means that we're introducing the possibility that a build 
>>> is occurring which may or may not be used, depending on the entirely 
>>> unclear dependencies of the setup commands, and the entirely unclear 
>>> relationship of that build output to the tox venv.  As a first step of 
>>> clarity, we could stop invoking nose using `python setup.py nosetests`, and 
>>> instead use `nosetests` (and in the future `pytest`).  I think the reason 
>>> for `python setup.py nosetest` is to ensure the test requirements are 
>>> installed,
>>
>>
>> I believe the reason we are invokign nosetest this way is related to the 
>> beam testing plugin. It is configure in setup.py. The behavior is documented 
>> here: https://nose.readthedocs.io/en/latest/api/commands.html
>
>
> It is possible to register a custom plugin without using setup.py: 
> https://nose.readthedocs.io/en/latest/plugins/writing.html#registering-a-plugin-without-setuptools
>
> Since we're on the verge of switching to pytest, perhaps we should 
> investigate switching that over to to not use setup.py instead of chasing our 
> tails with nose.
>
>>>
>>> but we could shift those to a separate requirements file or use 
>>> extra_requires and tox can ensure that they're installed.  I find these two 
>>> options to be pretty common patterns [3].
>>
>>
>> We do use extras is tox already. GCP tests work this way by installing 
>> additional GCP package. In my opinion, letting tox to setup the virtual 
>> environment either from the package or from setup.py is a better option than 
>> using requirements file. Otherwise we would need a way to keep setup.py and 
>> requirements file in sync.
>
>
> Oh yeah, I see that the tests already are an extra package.  Well, that'll 
> make it that much easier to stop using `python setup.py nosetests`.
>
> -chad
>


Re: Python Precommit duration pushing 2 hours

2019-11-08 Thread Robert Bradshaw
Just saw another 2-hour timeout:
https://builds.apache.org/job/beam_PreCommit_Python_Commit/9440/ , so
perhaps we're not out of the woods yet (though in general things have
been a lot better).

On Tue, Nov 5, 2019 at 10:52 AM Ahmet Altay  wrote:
>
> GCP tests are already on separate locations. IO related tests are under 
> /sdks/python/apache_beam/io/gcp and Dataflow related tests are under 
> sdks/python/apache_beam/runners/dataflow. It should be a matter of changing 
> gradle files to run either one of the base tests or GCP tests depending on 
> the types of changes. I do not expect this to have any material impact on the 
> precommit times because these two test suites take about exactly the same 
> time to complete.
>
> #9985 is merged now. Precommit times on master branch dropped to ~1h 20 for 
> the last 5 runs.
>
> On Tue, Nov 5, 2019 at 10:12 AM David Cavazos  wrote:
>>
>> +1 to moving the GCP tests outside of core. If there are issues that only 
>> show up on GCP tests but not in core, it might be an indication that there 
>> needs to be another test in core covering that, but I think that should be 
>> pretty rare.
>>
>> On Mon, Nov 4, 2019 at 8:33 PM Kenneth Knowles  wrote:
>>>
>>> +1 to moving forward with this
>>>
>>> Could we move GCP tests outside the core? Then only code changes 
>>> touches/affecting GCP would cause them to run in precommit. Could still run 
>>> them in postcommit in their own suite. If the core has reasonably stable 
>>> abstractions that the connectors are built on, this should not change 
>>> coverage much.
>>>
>>> Kenn
>>>
>>> On Mon, Nov 4, 2019 at 1:55 PM Ahmet Altay  wrote:

 PR for the proposed change: https://github.com/apache/beam/pull/9985

 On Mon, Nov 4, 2019 at 1:35 PM Udi Meiri  wrote:
>
> +1
>
> On Mon, Nov 4, 2019 at 12:09 PM Robert Bradshaw  
> wrote:
>>
>> +1, this seems like a good step with a clear win.
>>
>> On Mon, Nov 4, 2019 at 12:06 PM Ahmet Altay  wrote:
>> >
>> > Python precommits are still timing out on #9925. I am guessing that 
>> > means this change would not be enough.
>> >
>> > I am proposing cutting down the number of test variants we run in 
>> > precommits. Currently for each version we ran the following variants 
>> > serially:
>> > - base: Runs all unit tests with tox
>> > - Cython: Installs cython and runs all unit tests as base version. The 
>> > original purpose was to ensure that tests pass with or without cython. 
>> > There is probably a huge overlap with base. (IIRC only a few coders 
>> > have different slow vs fast tests.)
>> > - GCP: Installs GCP dependencies and tests all base + additional gcp 
>> > specific tests. The original purpose was to ensure that GCP is an 
>> > optional component and all non-GCP tests still works without GCP 
>> > components.
>> >
>> > We can reduce the list to cython + GCP tests only. This will cover the 
>> > same group of tests and will check that tests pass with or without 
>> > cython or GCP dependencies. This could reduce the precommit time by 
>> > ~30 minutes.
>> >
>> > What do you think?
>> >
>> > Ahmet
>> >
>> >
>> > On Tue, Oct 29, 2019 at 11:15 AM Robert Bradshaw  
>> > wrote:
>> >>
>> >> https://github.com/apache/beam/pull/9925
>> >>
>> >> On Tue, Oct 29, 2019 at 10:24 AM Udi Meiri  wrote:
>> >> >
>> >> > I don't have the bandwidth right now to tackle this. Feel free to 
>> >> > take it.
>> >> >
>> >> > On Tue, Oct 29, 2019 at 10:16 AM Robert Bradshaw 
>> >> >  wrote:
>> >> >>
>> >> >> The Python SDK does as well. These calls are coming from
>> >> >> to_runner_api, is_stateful_dofn, and validate_stateful_dofn which 
>> >> >> are
>> >> >> invoked once per pipene or bundle. They are, however, surprisingly
>> >> >> expensive. Even memoizing across those three calls should save a
>> >> >> significant amount of time. Udi, did you want to tackle this?
>> >> >>
>> >> >> Looking at the profile, Pipeline.to_runner_api() is being called 30
>> >> >> times in this test, and [Applied]PTransform.to_fn_api being called
>> >> >> 3111 times, so that in itself might be interesting to investigate.
>> >> >>
>> >> >> On Tue, Oct 29, 2019 at 8:26 AM Robert Burke  
>> >> >> wrote:
>> >> >> >
>> >> >> > As does the Go SDK. Invokers are memoized and when possible code 
>> >> >> > is generated to avoid reflection.
>> >> >> >
>> >> >> > On Tue, Oct 29, 2019, 6:46 AM Kenneth Knowles  
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Noting for the benefit of the thread archive in case someone 
>> >> >> >> goes digging and wonders if this affects other SDKs: the Java 
>> >> >> >> SDK memoizes DoFnSignatures and generated DoFnInvoker classes.
>> >> >> >>
>> >> >> >> Kenn
>> >> >> >>
>> >> >> >> On Mon, Oct 

Re: New Contributor

2019-11-08 Thread Luke Cwik
Welcome, I have added you as a contributor.

On Fri, Nov 8, 2019 at 10:16 AM Andrew Crites  wrote:

> It's crites. Thanks!
>
> On Thu, Nov 7, 2019 at 3:06 PM Kyle Weaver  wrote:
>
>> Can you please share your Jira username?
>>
>> On Thu, Nov 7, 2019 at 3:04 PM Andrew Crites 
>> wrote:
>>
>>> This is Andrew Crites. I'm making some changes to the Python Dataflow
>>> runner. Can someone add me as a contributor for Beam's Jira issue tracker?
>>> Apparently I can't be assigned issues right now.
>>>
>>> Thanks!
>>>
>>


Re: [DISCUSS] Avoid redundant encoding and decoding between runner and harness

2019-11-08 Thread Kenneth Knowles
On Fri, Nov 8, 2019 at 9:23 AM Luke Cwik  wrote:

>
>
> On Thu, Nov 7, 2019 at 7:36 PM Kenneth Knowles  wrote:
>
>>
>>
>> On Thu, Nov 7, 2019 at 9:19 AM Luke Cwik  wrote:
>>
>>> I did suggest one other alternative on Jincheng's PR[1] which was to
>>> allow windowless values to be sent across the gRPC port. The SDK would then
>>> be responsible for ensuring that the execution didn't access any properties
>>> that required knowledge of the timestamp, pane or window. This is different
>>> then adding the ValueOnlyWindowedValueCoder as a model coder because it
>>> allows SDKs to pass around raw values to functions without any windowing
>>> overhead which could be useful for things like the side input window
>>> mapping or window merging functions we have.
>>>
>>
>> When you say "pass around" what does it mean? If it is over the wire,
>> there is already no overhead to ValueOnlyWindowedValueCoder. So do you mean
>> the overhead of having the layer of boxing of WindowedValue? I would assume
>> all non-value components of the WindowedValue from
>> ValueOnlyWindowedValueCoder are pointers to a single shared immutable
>> instance carried by the coder instance.
>>
>
> I was referring to the layer of boxing of WindowedValue. My concern wasn't
> the performance overhead of passing around a wrapper object but the
> cognitive overhead of understanding why everything needs to be wrapped in a
> windowed value. Since you have been working on SQL for some time, this
> would be analogous to executing a UDF and making all the machinery around
> it take WindowedValue instead of T.
>
>
>> I think raw values can already be passed to functions, no? The main thing
>> is that elements in a PCollection always have a window, timestamp, and
>> paneinfo. Not all values are elements. Is there a specific case you have in
>> mind? I would not expect WindowMappingFn or window merging fn to be passing
>> "elements" but just values of the appropriate type for the function.
>>
>
> This is about the machinery around WindowMappingFn/WindowMergingFn. For
> example the implementation around WindowMappingFn takes a
> WindowedValue and unwraps it forwarding it to the WindowMappingFn
> and then takes the result and wraps it in a WindowedValue and
> returns that to the runner.
>

I'm not familiar with this, but it sounds like it should not be necessary
and is an implementation detail. Is there a model change necessary to avoid
the unboxing/boxing? I would be surprised.

Kenn


>
>
>>
>>
>>> On Thu, Nov 7, 2019 at 8:48 AM Robert Bradshaw 
>>> wrote:
>>>
 I think there is some misunderstanding about what is meant by option
 2. What Kenn (I think) and I are proposing is not a WindowedValueCoder
 whose window/timestamp/paneinfo coders are parameterized to be
 constant coders, but a WindowedValueCoder whose
 window/timestamp/paneinfo values are specified as constants in the
 coder.

 Let's call this NewValueOnlyWindowedValueCoder, and is parameterized
 by a window, timestamp, and pane info instance

 The existing ValueOnlyWindowedValueCoder is literally
 NewValueOnlyWindowedValueCoder(GlobalWindow, MIN_TIMESTAMP,
 PaneInfo.NO_FIRING). Note in particular that using the existing
 ValueOnlyWindowedValueCoder would give the wrong timestamp and pane
 info if it is use for the result of a GBK, which I think is the loss
 of consistency referred to here.

>>>
>> Yes, this is exactly what I am proposing and sounds like what "approach
>> 2" is. I think this approach "just works". It is simpler and more efficient
>> than "WindowedValueCoder.of(ValueCoder, WindowCoder, TimestampCoder,
>> PaneInfoCoder)" which would require some overhead even if WindowCoder,
>> TimestampCoder and PaneInfoCoder were constant coders consuming zero bytes.
>>
>> Kenn
>>
>>
>>
>>> On Thu, Nov 7, 2019 at 1:03 AM jincheng sun 
 wrote:
 >
 > Thanks for your feedback and the valuable comments, Kenn & Robert!
 >
 > I think your comments are more comprehensive and enlighten me a lot.
 The two proposals which I mentioned above are to reuse the existing coder
 (FullWindowedValueCoder and ValueOnlyWindowedValueCoder). Now, with your
 comments, I think we can further abstract 'FullWindowedValueCoder' and
 'ValueOnlyWindowedValueCoder', that is, we can rename
 'FullWindowedValueCoder' to 'WindowedValueCoder', and make the coders of
 window/timestamp/pane configurable. Then we can remove
 'ValueOnlyWindowedValueCoder' and need to add a mount of constant coders
 for window/timestamp/pane.
 >
 > I have replied your comments on the doc, and quick feedback as
 following:
 >
 > Regarding to "Approach 2: probably no SDK harness work / compatible
 with existing Beam model so no risk of introducing inconsistency",if we
 "just puts default window/timestamp/pane info on elements" and don't change
 the original coder, then the performance is not optimized. If we 

Re: [discuss] More dimensions for the Capability Matrix

2019-11-08 Thread Valentyn Tymofieiev
+1. I think we should also better reflect connector capabilities (or
include them into features), to avoid surprises like [1].

[1]
https://lists.apache.org/thread.html/9e9270bfb85058e24b762790e948d8bfc558f58ef1df9e14c4e4464c@%3Cuser.beam.apache.org%3E

On Fri, Nov 8, 2019 at 10:51 AM Kenneth Knowles  wrote:

>
> On Fri, Nov 8, 2019 at 9:46 AM Brian Hulette  wrote:
>
>> > Does it make sense to do this?
>> I think this makes a lot of sense. Plus it's a good opportunity to
>> refresh the UX of [1].
>>
>
> +1 to total UX refresh. I will advertise
> https://issues.apache.org/jira/browse/BEAM-2888 which has a lot of
> related ideas.
>
>
>> > what's a good way of doing it? Should we expand the existing Capability
>> Matrix to support SDKs as well? Or should we have a new one?
>> To me there are two aspects to this: how we model the data, and how we
>> present the data.
>>
>> For modelling the data:
>> Do we need to maintain the full 3-dimensional 
>> matrix? That seems untenable to me. With portability, I think the runner
>> and SDK matrix should be completely independent, so it should be safe to
>> just maintain , and  matrices and model
>> the 3-dimensional matrix as the cross-product of the two.
>> Maybe we should have a new capability matrix just for portable runners so
>> we can exploit this property?
>>
>
> Agree that we should not do the full product of <# runners> times <#
> SDKs>. That's the whole point of portability.
>
> Early in the project, we deliberately did not include SDKs in the
> capability matrix for philosophical reasons:
>
>  - a runner might support/not support features based on intrinsic
> properties of the underlying engine, not just immaturity
>  - an SDK can have no such intrinsic reason
>
> In practice, though, the matrix is more about maturity than intrinsic
> properties. And SDKs include now a significant runtime component which will
> always take time to implement new model features. I think we should embrace
> the matrix as primarily a measure of maturity and answer Pablo's initial
> question which is most useful for users.
>
> Kenn
>
>
>> For presenting the data:
>> I think there would be value in just presenting 
>> (basically what we have now in [1]), and also presenting 
>> separately. The  display could serve as documentation too,
>> with examples of how to do Y in each SDK.
>> Maybe there would also be value in presenting  in
>> some fancy UI so an architect can quickly answer "what can I do with SDK Z
>> on Runner X", but I'm not sure what that would look like.
>>
>> [1] https://beam.apache.org/documentation/runners/capability-matrix/
>>
>> On Thu, Nov 7, 2019 at 10:09 PM Thomas Weise  wrote:
>>
>>> FWIW there are currently at least 2 instances of capability matrix [1]
>>> [2].
>>>
>>> [1] has been in need of a refresh for a while.
>>>
>>> [2] is more useful but only covers portable runners and is hard to find.
>>>
>>> Thomas
>>>
>>> [1] https://beam.apache.org/documentation/runners/capability-matrix/
>>> [2] https://s.apache.org/apache-beam-portability-support-table
>>>
>>> On Thu, Nov 7, 2019 at 7:52 PM Pablo Estrada  wrote:
>>>
 Hi all,
 I think this is a relatively common question:

 - Can I do X with runner Y, and SDK Z?

 The answers vary significantly between SDK and Runner pairs. This makes
 it such that the current Capability Matrix falls somewhat short when
 potential users / solutions architects / etc are trying to decide to adopt
 Beam, and which Runner / SDK to use.

 I think we need to put some effort in building a capability matrix that
 expresses this information - and maintain it updated.

 I would like to discuss a few things:
 - Does it make sense to do this?
 - If it does, what's a good way of doing it? Should we expand the
 existing Capability Matrix to support SDKs as well? Or should we have a new
 one?
 - Any other thoughts you may have about the idea.

 Best
 -P.

>>>


Re: [discuss] More dimensions for the Capability Matrix

2019-11-08 Thread Kenneth Knowles
On Fri, Nov 8, 2019 at 9:46 AM Brian Hulette  wrote:

> > Does it make sense to do this?
> I think this makes a lot of sense. Plus it's a good opportunity to refresh
> the UX of [1].
>

+1 to total UX refresh. I will advertise
https://issues.apache.org/jira/browse/BEAM-2888 which has a lot of related
ideas.


> > what's a good way of doing it? Should we expand the existing Capability
> Matrix to support SDKs as well? Or should we have a new one?
> To me there are two aspects to this: how we model the data, and how we
> present the data.
>
> For modelling the data:
> Do we need to maintain the full 3-dimensional 
> matrix? That seems untenable to me. With portability, I think the runner
> and SDK matrix should be completely independent, so it should be safe to
> just maintain , and  matrices and model
> the 3-dimensional matrix as the cross-product of the two.
> Maybe we should have a new capability matrix just for portable runners so
> we can exploit this property?
>

Agree that we should not do the full product of <# runners> times <# SDKs>.
That's the whole point of portability.

Early in the project, we deliberately did not include SDKs in the
capability matrix for philosophical reasons:

 - a runner might support/not support features based on intrinsic
properties of the underlying engine, not just immaturity
 - an SDK can have no such intrinsic reason

In practice, though, the matrix is more about maturity than intrinsic
properties. And SDKs include now a significant runtime component which will
always take time to implement new model features. I think we should embrace
the matrix as primarily a measure of maturity and answer Pablo's initial
question which is most useful for users.

Kenn


> For presenting the data:
> I think there would be value in just presenting 
> (basically what we have now in [1]), and also presenting 
> separately. The  display could serve as documentation too,
> with examples of how to do Y in each SDK.
> Maybe there would also be value in presenting  in
> some fancy UI so an architect can quickly answer "what can I do with SDK Z
> on Runner X", but I'm not sure what that would look like.
>
> [1] https://beam.apache.org/documentation/runners/capability-matrix/
>
> On Thu, Nov 7, 2019 at 10:09 PM Thomas Weise  wrote:
>
>> FWIW there are currently at least 2 instances of capability matrix [1]
>> [2].
>>
>> [1] has been in need of a refresh for a while.
>>
>> [2] is more useful but only covers portable runners and is hard to find.
>>
>> Thomas
>>
>> [1] https://beam.apache.org/documentation/runners/capability-matrix/
>> [2] https://s.apache.org/apache-beam-portability-support-table
>>
>> On Thu, Nov 7, 2019 at 7:52 PM Pablo Estrada  wrote:
>>
>>> Hi all,
>>> I think this is a relatively common question:
>>>
>>> - Can I do X with runner Y, and SDK Z?
>>>
>>> The answers vary significantly between SDK and Runner pairs. This makes
>>> it such that the current Capability Matrix falls somewhat short when
>>> potential users / solutions architects / etc are trying to decide to adopt
>>> Beam, and which Runner / SDK to use.
>>>
>>> I think we need to put some effort in building a capability matrix that
>>> expresses this information - and maintain it updated.
>>>
>>> I would like to discuss a few things:
>>> - Does it make sense to do this?
>>> - If it does, what's a good way of doing it? Should we expand the
>>> existing Capability Matrix to support SDKs as well? Or should we have a new
>>> one?
>>> - Any other thoughts you may have about the idea.
>>>
>>> Best
>>> -P.
>>>
>>


Re: [discuss] More dimensions for the Capability Matrix

2019-11-08 Thread Robert Bradshaw
On Fri, Nov 8, 2019 at 9:46 AM Brian Hulette  wrote:
>
> > Does it make sense to do this?
> I think this makes a lot of sense. Plus it's a good opportunity to refresh 
> the UX of [1].
>
> > what's a good way of doing it? Should we expand the existing Capability 
> > Matrix to support SDKs as well? Or should we have a new one?
> To me there are two aspects to this: how we model the data, and how we 
> present the data.
>
> For modelling the data:
> Do we need to maintain the full 3-dimensional  
> matrix? That seems untenable to me. With portability, I think the runner and 
> SDK matrix should be completely independent, so it should be safe to just 
> maintain , and  matrices and model the 
> 3-dimensional matrix as the cross-product of the two.
> Maybe we should have a new capability matrix just for portable runners so we 
> can exploit this property?

Yes, being able to do that is the crux of the portability work. We may
have to consider, say, "Portable Spark" and "Non-Portable Spark" to be
two separate runners and have the caveat that some runners (namely the
non-portable ones) do not work with all SDKs.

Another thing I'd really, really like to see is these matrices
automatically populated via validates runner test attributes. E.g. you
can pick a runner, run the validates runner test suite, and see what
is fully/partially/not at all supported. This is harder to do for
SDKs, but at least you could get some signal by looking for the
existence of (passing) tests.

> For presenting the data:
> I think there would be value in just presenting  (basically 
> what we have now in [1]), and also presenting  separately. The 
>  display could serve as documentation too, with examples of 
> how to do Y in each SDK.
> Maybe there would also be value in presenting  in 
> some fancy UI so an architect can quickly answer "what can I do with SDK Z on 
> Runner X", but I'm not sure what that would look like.

I think two tables are fine. Note that with cross-language, the
restrictions of an SDK become less of an issue. One could imagine UIs
that would let you select a (set of?) SDKs and runners and
automatically populates the matrix according to the intersection.

> [1] https://beam.apache.org/documentation/runners/capability-matrix/
>
> On Thu, Nov 7, 2019 at 10:09 PM Thomas Weise  wrote:
>>
>> FWIW there are currently at least 2 instances of capability matrix [1] [2].
>>
>> [1] has been in need of a refresh for a while.
>>
>> [2] is more useful but only covers portable runners and is hard to find.
>>
>> Thomas
>>
>> [1] https://beam.apache.org/documentation/runners/capability-matrix/
>> [2] https://s.apache.org/apache-beam-portability-support-table
>>
>> On Thu, Nov 7, 2019 at 7:52 PM Pablo Estrada  wrote:
>>>
>>> Hi all,
>>> I think this is a relatively common question:
>>>
>>> - Can I do X with runner Y, and SDK Z?
>>>
>>> The answers vary significantly between SDK and Runner pairs. This makes it 
>>> such that the current Capability Matrix falls somewhat short when potential 
>>> users / solutions architects / etc are trying to decide to adopt Beam, and 
>>> which Runner / SDK to use.
>>>
>>> I think we need to put some effort in building a capability matrix that 
>>> expresses this information - and maintain it updated.
>>>
>>> I would like to discuss a few things:
>>> - Does it make sense to do this?
>>> - If it does, what's a good way of doing it? Should we expand the existing 
>>> Capability Matrix to support SDKs as well? Or should we have a new one?
>>> - Any other thoughts you may have about the idea.
>>>
>>> Best
>>> -P.


Re: Detecting resources to stage

2019-11-08 Thread Robert Bradshaw
Note that resources are more properly tied to specific operations and
stages, not to the entire pipeline. This is especially true in the
face of libraries (which should have the ability to declare their own
resources) and cross-language.

On Fri, Nov 8, 2019 at 10:19 AM Łukasz Gajowy  wrote:
>
> I figured that it would be good to bump this thread for greater visibility 
> even though I don't have a strong opinion about this (yet - hopefully, I will 
> know more later to be able to share ;) ).
>
> Answering the questions Luke asked will unblock this issue: 
> https://issues.apache.org/jira/browse/BEAM-5495. Solving it is needed for 
> Java 11 migration (current detecting mechanism does not work with java > 8).
>
>
>>
>> That said letting the user resolve the jars to stage can be saner instead of 
>> assuming it is in the classpath/loader. I already have a few cases where it 
>> will fail cause the transforms load the jars from outside the app 
>> classloader (transforms are isolated).
>
>
>
> If I understand correctly, at least in Dataflow runner, if users want to 
> provide custom resources to stage, they can use filesToStage pipeline option. 
> Once the option is not null, the runner doesn't detect the resources 
> automatically and stages resources enlisted in the option instead. I think 
> this should be the approach common for all runners (if it is not the case 
> already).
>
> Thanks,
> Łukasz
>
>
>


Re: Key encodings for state requests

2019-11-08 Thread Robert Bradshaw
On Fri, Nov 8, 2019 at 2:09 AM jincheng sun  wrote:
>
> Hi,
>
> Sorry for my late reply. It seems the conclusion has been reached. I just 
> want to share my personal thoughts.
>
> Generally, both option 1 and 3 make sense to me.
>
> >> The key concept here is not "standard coder" but "coder that the
> >> runner does not understand." This knowledge is only in the runner.
> >> Also has the downside of (2).
>
> >Yes, I had assumed "non-standard" and "unknown" are the same, but the
> >latter can be a subset of the former, i.e. if a Runner does not support
> >all of the standard coders for some reason.
>
> I'm also assume that "non-standard" and "unknown" are the same. Currently, in 
> the runner side[1] it
> decides whether the coder is unknown(wrap with length prefix coder) according 
> to whether the coder is among
> the standard coders. It will not communicate with harness to make this 
> decision.
>
> So, from my point of view, we can update the PR according to option 1 or 3.
>
> [1] 
> https://github.com/apache/beam/blob/66a67e6580b93906038b31ae7070204cec90999c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62

That list is populated in Java code [1] and has typically been a
subset of what is in the proto file. Things like StringUtf8Coder and
DoubleCoder have been added at different times to different SDKs and
Runners, sometimes long after the URN is in the proto. Having to keep
this list synchronized (and versioned) would be a regression.

[1] 
https://github.com/apache/beam/blob/release-2.17.0/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java

The PR taking approach (1) looks good at a first glance (I see others
are reviewing it). Thanks.

> Maximilian Michels  于2019年11月8日周五 上午3:35写道:
>>
>> > While the Go SDK doesn't yet support a State API, Option 3) is what the Go 
>> > SDK does for all non-standard coders (aka custom coders) anyway.
>>
>> For wire transfer, the Java Runner also adds a LengthPrefixCoder for the
>> coder and its subcomponents. The problem is that this is an implicit
>> assumption made. In the Proto, we do not have this represented. This is
>> why **for state requests**, we end up with a
>> "LengthPrefixCoder[CustomCoder]" on the Runner and a "CustomCoder" on
>> the SDK Harness side. Note that the Python Harness does wrap unknown
>> coders in a LengthPrefixCoder for transferring regular elements, but the
>> LengthPrefixCoder is not preserved for the state requests.
>>
>> In that sense (3) is good because it follows this implicit notion of
>> adding a LengthPrefixCoder for wire transfer, but applies it to state
>> requests.
>>
>> However, option (1) is most reliable because the LengthPrefixCoder is
>> actually in the Proto. So "CustomCoder" will always be represented as
>> "LengthPrefixCoder[CustomCoder]", and only standard coders will be added
>> without a LengthPrefixCoder.
>>
>> > I'd really like to avoid implicit agreements about how the coder that
>> > should be used differs from what's specified in the proto in different
>> > contexts.
>>
>> Option (2) would work on top of the existing logic because replacing a
>> non-standard coder with a "NOOP coder" would just be used by the Runner
>> to produce a serialized version of the key for partitioning. Flink
>> always operates on the serialized key, be it standard or non-standard
>> coder. It wouldn't be necessary to change any of the existing wire
>> transfer logic or representation. I understand that it would be less
>> ideal, but maybe easier to fix for the release.
>>
>> > The key concept here is not "standard coder" but "coder that the
>> > runner does not understand." This knowledge is only in the runner.
>> > Also has the downside of (2).
>>
>> Yes, I had assumed "non-standard" and "unknown" are the same, but the
>> latter can be a subset of the former, i.e. if a Runner does not support
>> all of the standard coders for some reason.
>>
>> > This means that the wire format that the runner sends for the "key" 
>> > represents the exact same wire format it will receive for state requests.
>>
>> The wire format for the entire element is the same. Otherwise we
>> wouldn't be able to process data between the Runner and the SDK Harness.
>> However, the problem is that the way the Runner instantiates the key
>> coder to partition elements, does not match how the SDK encodes the key
>> when it sends a state request to the Runner. Conceptually, those two
>> situations should be the same, but in practice they are not.
>>
>>
>> Now that I thought about it again option (1) is probably the most
>> explicit and in that sense cleanest. However, option (3) is kind of fair
>> because it would just replicate the implicit LengthPrefixCoder behavior
>> we have for general wire transfer also for state requests. Option (2) I
>> suppose is the most implicit and runner-specific, should probably be
>> avoided in the long run.
>>
>> So 

Re: Detecting resources to stage

2019-11-08 Thread Łukasz Gajowy
I figured that it would be good to bump this thread for greater visibility
even though I don't have a strong opinion about this (yet - hopefully, I
will know more later to be able to share ;) ).

Answering the questions Luke asked will unblock this issue:
https://issues.apache.org/jira/browse/BEAM-5495. Solving it is needed for
Java 11 migration (current detecting mechanism does not work with java >
8).



> That said letting the user resolve the jars to stage can be saner instead
> of assuming it is in the classpath/loader. I already have a few cases where
> it will fail cause the transforms load the jars from outside the app
> classloader (transforms are isolated).
>


If I understand correctly, at least in Dataflow runner, if users want to
provide custom resources to stage, they can use filesToStage pipeline
option. Once the option is not null, the runner doesn't detect the
resources automatically and stages resources enlisted in the option
instead. I think this should be the approach common for all runners (if it
is not the case already).

Thanks,
Łukasz


Re: New Contributor

2019-11-08 Thread Andrew Crites
It's crites. Thanks!

On Thu, Nov 7, 2019 at 3:06 PM Kyle Weaver  wrote:

> Can you please share your Jira username?
>
> On Thu, Nov 7, 2019 at 3:04 PM Andrew Crites 
> wrote:
>
>> This is Andrew Crites. I'm making some changes to the Python Dataflow
>> runner. Can someone add me as a contributor for Beam's Jira issue tracker?
>> Apparently I can't be assigned issues right now.
>>
>> Thanks!
>>
>


Re: (Question) SQL integration tests for MongoDb

2019-11-08 Thread Kirill Kozlov
Alternative approach would be to manually start a MongoDb service like it
is done here:
https://github.com/apache/beam/blob/master/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java#L85
Doing it like in the example above should solve my problem.

Thank you for your help!

-
Kirill


On Fri, Nov 8, 2019, 03:09 Michał Walenia 
wrote:

> Won't the command be analogous to what is in the Javadoc of
> MongoDbReadWriteIT? It seems that you don't need to use
> `enableJavaPerformanceTesting`, as `integrationTest` task parses
> `pipelineOptions` parameter.
>
>
>
> On Thu, Nov 7, 2019 at 6:40 PM Kirill Kozlov 
> wrote:
>
>> Thank you for your response!
>>
>> I want to make sure that when tests run on Jenkins they get supplied with
>> pipelines options containing hostName and Port of a running MongoDb service.
>>
>> I'm writing integration test for a MongoDb SQL adapter (located
>> sdks/java/extensions/sql/meta/provider/mongodb).
>> I cannot simply use `enableJavaPerformanceTesting()`, because tests for
>> all adapters are run via the same build file, which has a custom task
>> "integrationTest".
>>
>> I hope this better explains the problem I am trying to tackle.
>>
>> -
>> Kirill
>>
>> On Thu, Nov 7, 2019, 03:36 Michał Walenia 
>> wrote:
>>
>>> Hi,
>>>
>>> What exactly are you trying to do? If you're looking for a way to
>>> provide pipeline options to the MongoDBIOIT, you can pass them via command
>>> line like this:
>>>
>>> ./gradlew integrationTest -p sdks/java/io/mongodb
>>>
>>>
>>>
>>> * -DintegrationTestPipelineOptions='[   "--mongoDBHostName=1.2.3.4",
>>>  "--mongoDBPort=27017",   "--mongoDBDatabaseName=mypass",
>>>  "--numberOfRecords=1000" ]'*
>>>--tests org.apache.beam.sdk.io.mongodb.MongoDbIOIT
>>>-DintegrationTestRunner=direct
>>>
>>> Gradle tasks created with `enableJavaPerformanceTesting()` will allow
>>> such options to be passed.
>>>
>>> If you're trying to do something else, please let me know.
>>>
>>> Regards
>>> Michal
>>>
>>> On Thu, Nov 7, 2019 at 1:44 AM Kirill Kozlov 
>>> wrote:
>>>
 Hi everyone!

 I am trying to test MongoDb Sql Table, but not quite sure how to pass
 pipeline options with the hostName, port, and databaseName used by Jenkins.

 It looks like the integration test for MongoDbIO Connector obtain those
 values from the
 'beam/.test-infra/jenkins/job_PerformanceTests_MongoDBIO_IT.groovy' file
 via calling the following methods in the 'gradle.build' file:
 provideIntegrationTestingDependencies()
 enableJavaPerformanceTesting()

 Sql build file already has a task with the name 'integrationTest'
 defined and does not let us do `enableJavaPerformanceTesting()`.

  I would really appreciate if someone could provide me with a couple of
 pointers on getting this to work.

 -
 Kirill

>>>
>>>
>>> --
>>>
>>> Michał Walenia
>>> Polidea  | Software Engineer
>>>
>>> M: +48 791 432 002 <+48791432002>
>>> E: michal.wale...@polidea.com
>>>
>>> Unique Tech
>>> Check out our projects! 
>>>
>>
>
> --
>
> Michał Walenia
> Polidea  | Software Engineer
>
> M: +48 791 432 002 <+48791432002>
> E: michal.wale...@polidea.com
>
> Unique Tech
> Check out our projects! 
>


Re: RabbitMQ and CheckpointMark feasibility

2019-11-08 Thread Eugene Kirpichov
On Fri, Nov 8, 2019 at 5:57 AM Daniel Robert  wrote:

> Thanks Euguene and Reuven.
>
> In response to Eugene, I'd like to confirm I have this correct: In the
> rabbit-style use case of "stream-system-side checkpointing", it is safe
> (and arguably the correct behavior) to ignore the supplied CheckpointMark
> argument in `createReader(options, checkpointmark)` and in the constructor
> for the and instead always instantiate a new CheckpointMark during
> construction. Is that correct?
>
Yes, this is correct.


> In response to Reuven: noted, however I was mostly using serialization in
> the general sense. That is, there does not seem to be any means of
> deserializing a RabbitMqCheckpointMark such that it can continue to provide
> value to a runner. Whether it's java serialization, avro, or any other
> Coder, the 'channel' itself cannot "come along for the ride", which leaves
> the rest of the internal state mostly unusable except for perhaps some
> historical, immutable use case.
>
> -Danny
> On 11/8/19 2:01 AM, Reuven Lax wrote:
>
> Just to clarify one thing: CheckpointMark does not need to be Java
> Seralizable. All that's needed is do return a Coder for the CheckpointMark
> in getCheckpointMarkCoder.
>
> On Thu, Nov 7, 2019 at 7:29 PM Eugene Kirpichov  wrote:
>
>> Hi Daniel,
>>
>> This is probably insufficiently well documented. The CheckpointMark is
>> used for two purposes:
>> 1) To persistently store some notion of how much of the stream has been
>> consumed, so that if something fails we can tell the underlying streaming
>> system where to start reading when we re-create the reader. This is why
>> CheckpointMark is Serializable. E.g. this makes sense for Kafka.
>> 2) To do acks - to let the underlying streaming system know that the Beam
>> pipeline will never need data up to this CheckpointMark. Acking does not
>> require serializability - runners call ack() on the same in-memory instance
>> of CheckpointMark that was produced by the reader. E.g. this makes sense
>> for RabbitMq or Pubsub.
>>
>> In practice, these two capabilities tend to be mutually exclusive: some
>> streaming systems can provide a serializable CheckpointMark, some can do
>> acks, some can do neither - but very few (or none) can do both, and it's
>> debatable whether it even makes sense for a system to provide both
>> capabilities: usually acking is an implicit form of streaming-system-side
>> checkpointing, i.e. when you re-create the reader you don't actually need
>> to carry over any information from an old CheckpointMark - the necessary
>> state (which records should be delivered) is maintained on the streaming
>> system side.
>>
>> These two are lumped together into one API simply because that was the
>> best design option we came up with (not for lack of trying, but suggestions
>> very much welcome - AFAIK nobody is happy with it).
>>
>> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it
>> can do acks. So you can simply ignore the non-serializability.
>>
>> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert 
>> wrote:
>>
>>> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.
>>> As part of this I switched to a pull-based API rather than the
>>> previously-used push-based. This has caused some nebulous problems so
>>> put up a correction PR that I think needs some eyes fairly quickly as
>>> I'd consider master to be broken for rabbitmq right now. The PR keeps
>>> the upgrade but reverts to the same push-based implementation as in 4.x:
>>> https://github.com/apache/beam/pull/9977 )
>>>
>>> Regardless, in trying to get the pull-based API to work, I'm finding the
>>> interactions between rabbitmq and beam with CheckpointMark to be
>>> fundamentally impossible to implement so I'm hoping for some input here.
>>>
>>> CheckointMark itself must be Serializable, presumably this means it gets
>>> shuffled around between nodes. However 'Channel', the tunnel through
>>> which it communicates with Rabbit to ack messages and finalize the
>>> checkpoint, is non-Serializable. Like most other CheckpointMark
>>> implementations, Channel is 'transient'. When a new CheckpointMark is
>>> instantiated, it's given a Channel. If an existing one is supplied to
>>> the Reader's constructor (part of the 'startReader()' interface), the
>>> channel is overwritten.
>>>
>>> *However*, Rabbit does not support 'ack'ing messages on a channel other
>>> than the one that consumed them in the first place. Attempting to do so
>>> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
>>>
>>> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>>> ).
>>>
>>> Truthfully, I don't really understand how the current implementation is
>>> working; it seems like a happy accident. But I'm curious if someone
>>> could help me debug and implement how to bridge the
>>> re-usable/serializable CheckpointMark requirement in Beam with this
>>> limitation of Rabbit.
>>>
>>> Thanks,
>>> -Daniel Robert

Re: Contributor permission for Beam Jira tickets

2019-11-08 Thread Luke Cwik
Welcome, I have added you as a contributor and assigned BEAM-8579 to you.

On Thu, Nov 7, 2019 at 3:14 PM Changming Ma  wrote:

> Oh, one more thing: my jira account name is: cmma
>
>
>
> On Thu, Nov 7, 2019 at 3:04 PM Changming Ma  wrote:
>
>> Hi,
>> This is Changming, a SWE with Google. I'm working on a GCP DataFlow item
>> and it'll be nice some of my changes can be backported to beam (e.g.,
>> BEAM-8579).
>> Could someone please add me as a contributor for Beam's Jira issue
>> tracker? My github account is cmm08 (email: c...@google.com).
>>
>> Thank you,
>> Changming
>>
>


Re: [spark structured streaming runner] merge to master?

2019-11-08 Thread Kenneth Knowles
On Thu, Nov 7, 2019 at 5:32 PM Etienne Chauchot 
wrote:
>
> Hi guys
>
> @Kenn,
>
> I just wanted to mention that I did answered your question on
dependencies here:
https://lists.apache.org/thread.html/5a85caac41e796c2aa351d835b3483808ebbbd4512b480940d494439@%3Cdev.beam.apache.org%3E

Ah, sorry! In that case there is no problem at all.


> I'm not in favor of having the 2 runners in one jar, the point about
having 2 jars was to:
>
> - avoid making promises to users on a work in progress runner (make it
explicit with a different jar)
> - avoid confusion for them (why are there 2 pipeline options? etc)
>
> If the community believes that there is no confusion or wrong promises
with the one jar solution, we could leave the 2 runners in one jar.
>
> Maybe we could start a vote on that?

It seems unanimous among others to have one jar. There were some
suggestions of how to avoid promises and confusion, like Ryan's most recent
email. Did any of the ideas sound good to you?

Kenn


I have no objection to putting the experimental runner alongside the
> stable, mature runner.  We have some precedence with the portable
> spark runner, and that's worked out pretty well -- at least, I haven't
> heard any complaints from confused users!
>
> That being said:
>
> 1.  It really should be marked @Experimental in the code *and* clearly
> warned in API (javadoc) and documentation.
>
> 2.  Ideally, I'd like to see a warning banner in the logs when it's
> used, pointing to the stable SparkRunner and/or documentation on the
> current known issues.
>
> All my best, Ryan
>
>
>
>
>
>
> > regarding jars:
> >
> > I don't like 3 jars either.
> >
> >
> > Etienne
> >
> > On 31/10/2019 02:06, Kenneth Knowles wrote:
> >
> > Very good points. We definitely ship a lot of code/features in very
> early stages, and there seems to be no problem.
> >
> > I intend mostly to leave this judgment to people like you who know
> better about Spark users.
> >
> > But I do think 1 or 2 jars is better than 3. I really don't like "3
> jars" and I did give two reasons:
> >
> > 1. diamond deps where things overlap
> > 2. figuring out which thing to depend on
> >
> > Both are annoying for users. I am not certain if it could lead to a real
> unsolvable situation. This is just a Java ecosystem problem so I feel
> qualified to comment.
> >
> > I did also ask if there were major dependency differences between the
> two that could cause problems for users. This question was dropped and no
> one cares to comment so I assume it is not an issue. So then I favor having
> just 1 jar with both runners.
> >
> > Kenn
> >
> > On Wed, Oct 30, 2019 at 2:46 PM Ismaël Mejía  wrote:
> >>
> >> I am still a bit lost about why we are discussing options without
> giving any
> >> arguments or reasons for the options? Why is 2 modules better than 3 or
> 3 better
> >> than 2, or even better, what forces us to have something different than
> a single
> >> module?
> >>
> >> What are the reasons for wanting to have separate jars? If the issue is
> that the
> >> code is unfinished or not passing the tests, the impact for end users
> is minimal
> >> because they cannot accidentally end up running the new runner, and if
> they
> >> decide to do so we can warn them it is at their own risk and not ready
> for
> >> production in the documentation + runner.
> >>
> >> If the fear is that new code may end up being intertwined with the
> classic and
> >> portable runners and have some side effects. We have the
> ValidatesRunner +
> >> Nexmark in the CI to cover this so again I do not see what is the
> problem that
> >> requires modules to be separate.
> >>
> >> If the issue is being uncomfortable about having in-progress code in
> released
> >> artifacts we have been doing this in Beam forever, for example most of
> the work
> >> on portability and Schema/SQL, and all of those were still part of
> artifacts
> >> long time before they were ready for prime use, so I still don't see
> why this
> >> case is different to require different artifacts.
> >>
> >> I have the impression we are trying to solve a non-issue by adding a
> lot of
> >> artificial complexity (in particular to the users), or am I missing
> something
> >> else?
> >>
> >> On Wed, Oct 30, 2019 at 7:40 PM Kenneth Knowles 
> wrote:
> >> >
> >> > Oh, I mean that we ship just 2 jars.
> >> >
> >> > And since Spark users always build an uber jar, they can still depend
> on both of ours and be able to switch runners with a flag.
> >> >
> >> > I really dislike projects shipping overlapping jars. It is confusing
> and causes major diamond dependency problems.
> >> >
> >> > Kenn
> >> >
> >> > On Wed, Oct 30, 2019 at 11:12 AM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
> >> >>
> >> >> Yes, agree, two jars included in uber jar will work in the similar
> way. Though having 3 jars looks still quite confusing for me.
> >> >>
> >> >> On 29 Oct 2019, at 23:54, Kenneth Knowles  wrote:
> >> >>
> >> >> Is it just as easy to have two jars and 

Re: Deprecate some or all of TestPipelineOptions?

2019-11-08 Thread Luke Cwik
It can be marked as deprecated and we can remove its usage everywhere but
leave this interface and mark it for removal at some future time.

On Thu, Nov 7, 2019 at 2:23 PM Ismaël Mejía  wrote:

> Thanks for bringing this to the ML Brian
>
> +1 For full TestPipelineOptions deprecation. Even worth to remove it,
> bad part is that this class resides in 'sdks/core/main/java' and not
> in testing as I imagined so this could count as a 'breaking' change.
>
> On Thu, Nov 7, 2019 at 8:27 PM Luke Cwik  wrote:
> >
> > There was issue with asynchrony of p.run(), some runners blocked till
> the pipeline was complete with p.run() which was never meant to be the
> intent.
> >
> > The test timeout one makes sense to be able to configure it per runner
> (since Dataflow takes a lot longer than other runners) but we may be able
> to configure a Junit test timeout attribute instead.
> >
> > I would be for getting rid of them.
> >
> >
> > On Wed, Nov 6, 2019 at 3:36 PM Robert Bradshaw 
> wrote:
> >>
> >> +1 to all of these are probably obsolete at this point and would be
> >> nice to remove.
> >>
> >>
> >> On Wed, Nov 6, 2019 at 3:00 PM Kenneth Knowles  wrote:
> >> >
> >> > Good find. I think TestPipelineOptions is from very early days. It
> makes sense to me that these are all obsolete. Some guesses, though I
> haven't dug through commit history to confirm:
> >> >
> >> >  - TempRoot: a while ago TempLocation was optional, so I think this
> would provide a default for things like gcpTempLocation and stagingLocation
> >> >  - OnSuccessMatcher: for runners where pipeline used to not terminate
> in streaming mode. Now I think every runner can successfully
> waitUntilFinish. Also the current API for waitUntilFinish went through some
> evolutions around asynchrony so it wasn't always a good choice.
> >> >  - OnCreateMatcher: just for symmetry? I don't know
> >> >  - TestTimeoutSeconds: probably also for the
> asychrony/waitUntilfinish issue
> >> >
> >> > Kenn
> >> >
> >> > On Wed, Nov 6, 2019 at 12:19 PM Brian Hulette 
> wrote:
> >> >>
> >> >> I recently came across TestPipelineOptions, and now I'm wondering if
> maybe it should be deprecated. It only seems to actually be supported for
> Spark and Dataflow (via TestSparkRunner and TestDataflowRunner), and I
> think it may make more sense to move the functionality it provides into the
> tests that need it.
> >> >>
> >> >> TestPipelineOptions currently has four attributes:
> >> >>
> >> >> # TempRoot
> >> >> It's purpose isn't documented, but many tests read TempRoot and use
> it to set a TempLocation (example). I think this attribute makes sense
> (e.g. we can set TempRoot once and each test has its own subdirectory), but
> I'm not sure. Can anyone confirm the motivation for it? I'd like to at
> least add a docstring for it.
> >> >>
> >> >> # OnCreateMatcher
> >> >> A way to register a matcher that will be checked right after a
> pipeline has started. It's never set except for in TestDataflowRunnerTest,
> so I think this is absolutely safe to remove.
> >> >>
> >> >> # OnSuccessMatcher
> >> >> A way to register a matcher that will be checked right after a
> pipeline has successfully completed. This is used in several tests
> (RequiresStableInputIT, WordCountIT, ... 8 total occurrences), but I don't
> see why they couldn't all be replaced with a `p.run().waitUntilFinish()`,
> followed by an assert.
> >> >>
> >> >> I think the current approach is actually dangerous, because running
> these tests with runners other than TestDataflowRunner or TestSparkRunner
> means the matchers are never actually checked. This is actually how I came
> across TestPipelineOptions - I tried running a test with the DirectRunner
> and couldn't make it fail.
> >> >>
> >> >> # TestTimeoutSeconds
> >> >> Seems to just be a wrapper for `waitUntilFinish(duration)`, and only
> used in one place. I think it would be cleaner for the test to be
> responsible for calling waitUntilFinish (which we do elsewhere), the only
> drawback is it requires a small refactor so the test has access to the
> PipelineResult object.
> >> >>
> >> >>
> >> >> So I have a couple of questions for the community
> >> >> 1) Are there thoughts on TempRoot? Can we get rid of it?
> >> >> 2) Are there any objections to removing the other three attributes?
> Am I missing something? Unless there are any objections I think I'll write
> a patch to remove them.
> >> >>
> >> >> Thanks,
> >> >> Brian
>


Re: [discuss] More dimensions for the Capability Matrix

2019-11-08 Thread Brian Hulette
> Does it make sense to do this?
I think this makes a lot of sense. Plus it's a good opportunity to refresh
the UX of [1].

> what's a good way of doing it? Should we expand the existing Capability
Matrix to support SDKs as well? Or should we have a new one?
To me there are two aspects to this: how we model the data, and how we
present the data.

For modelling the data:
Do we need to maintain the full 3-dimensional 
matrix? That seems untenable to me. With portability, I think the runner
and SDK matrix should be completely independent, so it should be safe to
just maintain , and  matrices and model
the 3-dimensional matrix as the cross-product of the two.
Maybe we should have a new capability matrix just for portable runners so
we can exploit this property?

For presenting the data:
I think there would be value in just presenting 
(basically what we have now in [1]), and also presenting 
separately. The  display could serve as documentation too,
with examples of how to do Y in each SDK.
Maybe there would also be value in presenting  in
some fancy UI so an architect can quickly answer "what can I do with SDK Z
on Runner X", but I'm not sure what that would look like.

[1] https://beam.apache.org/documentation/runners/capability-matrix/

On Thu, Nov 7, 2019 at 10:09 PM Thomas Weise  wrote:

> FWIW there are currently at least 2 instances of capability matrix [1] [2].
>
> [1] has been in need of a refresh for a while.
>
> [2] is more useful but only covers portable runners and is hard to find.
>
> Thomas
>
> [1] https://beam.apache.org/documentation/runners/capability-matrix/
> [2] https://s.apache.org/apache-beam-portability-support-table
>
> On Thu, Nov 7, 2019 at 7:52 PM Pablo Estrada  wrote:
>
>> Hi all,
>> I think this is a relatively common question:
>>
>> - Can I do X with runner Y, and SDK Z?
>>
>> The answers vary significantly between SDK and Runner pairs. This makes
>> it such that the current Capability Matrix falls somewhat short when
>> potential users / solutions architects / etc are trying to decide to adopt
>> Beam, and which Runner / SDK to use.
>>
>> I think we need to put some effort in building a capability matrix that
>> expresses this information - and maintain it updated.
>>
>> I would like to discuss a few things:
>> - Does it make sense to do this?
>> - If it does, what's a good way of doing it? Should we expand the
>> existing Capability Matrix to support SDKs as well? Or should we have a new
>> one?
>> - Any other thoughts you may have about the idea.
>>
>> Best
>> -P.
>>
>


Re: Triggers still finish and drop all data

2019-11-08 Thread Steve Niemitz
Yeah that looks like what I had in mind too.  I think the most useful
notification output would be a KV of (K, summary)?

On Fri, Nov 8, 2019 at 12:38 PM Kenneth Knowles  wrote:

> This sounds like a useful feature, if I understand it: a generic transform
> (build on a generic stateful DoFn) where the end-user provides a monotonic
> predicate over the input it has seen. It emits a notification exactly once
> when the predicate is first satisfied. To be efficient, it will also need
> some form of summarization over the input seen.
>
> Notify
>   .withSummarizer(combineFn)
>   .withPredicate(summary -> ...)
>
> Something like that? The complexity is not much less than just writing a
> stateful DoFn directly, but the boilerplate is much less.
>
> Kenn
>
> On Thu, Nov 7, 2019 at 2:02 PM Steve Niemitz  wrote:
>
>> Interestingly enough, we just had a use case come up that I think could
>> have been solved by finishing triggers.
>>
>> Basically, we want to emit a notification when a certain threshold is
>> reached (in this case, we saw at least N elements for a given key), and
>> then never notify again within that window.  As mentioned, we can
>> accomplish this using a stateful DoFn as mentioned above, but I thought it
>> was interesting that this just came up, and wanted to share.
>>
>> Maybe it'd be worth building something to simulate this into the SDK?
>>
>> On Mon, Nov 4, 2019 at 8:15 PM Kenneth Knowles  wrote:
>>
>>> By the way, adding this guard uncovered two bugs in Beam's Java
>>> codebase, luckily only benchmarks and tests. There were *no* non-buggy
>>> instances of a finishing trigger. They both declare allowed lateness that
>>> is never used.
>>>
>>> Nexmark query 10:
>>>
>>> // Clear fancy triggering from above.
>>> .apply(
>>> Window.>into(...)
>>> .triggering(AfterWatermark.pastEndOfWindow())
>>> // We expect no late data here, but we'll assume the
>>> worst so we can detect any.
>>> .withAllowedLateness(Duration.standardDays(1))
>>> .discardingFiredPanes())
>>>
>>> This is nonsensical: the trigger will fire once and close, never firing
>>> again. So the allowed lateness has no effect except to change counters from
>>> "dropped due to lateness" to "dropped due to trigger closing". The intent
>>> would appear to be to restore the default triggering, but it failed.
>>>
>>> PipelineTranslationTest:
>>>
>>>
>>>  Window.into(FixedWindows.of(Duration.standardMinutes(7)))
>>> .triggering(
>>> AfterWatermark.pastEndOfWindow()
>>>
>>> .withEarlyFirings(AfterPane.elementCountAtLeast(19)))
>>> .accumulatingFiredPanes()
>>> .withAllowedLateness(Duration.standardMinutes(3L)));
>>>
>>> Again, the allowed lateness has no effect. This test is just to test
>>> portable proto round-trip. But still it is odd to write a nonsensical
>>> pipeline for this.
>>>
>>> Takeaway: experienced Beam developers never use this pattern, but they
>>> still get it wrong and create pipelines that would have data loss bugs
>>> because of it.
>>>
>>> Since there is no other discussion here, I will trust the community is
>>> OK with this change and follow Jan's review of my implementation of his
>>> idea.
>>>
>>> Kenn
>>>
>>>
>>> On Thu, Oct 31, 2019 at 4:06 PM Kenneth Knowles  wrote:
>>>
 Opened https://github.com/apache/beam/pull/9960 for this idea. This
 will alert users to broken pipelines and force them to alter them.

 Kenn

 On Thu, Oct 31, 2019 at 2:12 PM Kenneth Knowles 
 wrote:

> On Thu, Oct 31, 2019 at 2:11 AM Jan Lukavský  wrote:
>
>> Hi Kenn,
>>
>> does there still remain some use for trigger to finish? If we don't
>> drop
>> data, would it still be of any use to users? If not, would it be
>> better
>> to just remove the functionality completely, so that users who use it
>> (and it will possibly break for them) are aware of it at compile time?
>>
>> Jan
>>
>
> Good point. I believe there is no good use for a top-level trigger
> finishing. As mentioned, the intended uses aren't really met by triggers,
> but are met by stateful DoFn.
>
> Eugene's bug even has this title :-). We could not change any behavior
> but just reject pipelines with broken top-level triggers. This is probably
> a better solution. Because if a user has a broken trigger, the new 
> behavior
> is probably not enough to magically fix their pipeline. They are better 
> off
> knowing that they are broken and fixing it.
>
> And at that point, there is a lot of dead code and my PR is really
> just cleaning it up as a simplification.
>
> Kenn
>
>
>
>> On 10/30/19 11:26 PM, Kenneth Knowles wrote:
>> > Problem: a trigger can "finish" which causes a window to "close"
>> and
>> > drop all remaining data arriving for 

Re: Triggers still finish and drop all data

2019-11-08 Thread Kenneth Knowles
This sounds like a useful feature, if I understand it: a generic transform
(build on a generic stateful DoFn) where the end-user provides a monotonic
predicate over the input it has seen. It emits a notification exactly once
when the predicate is first satisfied. To be efficient, it will also need
some form of summarization over the input seen.

Notify
  .withSummarizer(combineFn)
  .withPredicate(summary -> ...)

Something like that? The complexity is not much less than just writing a
stateful DoFn directly, but the boilerplate is much less.

Kenn

On Thu, Nov 7, 2019 at 2:02 PM Steve Niemitz  wrote:

> Interestingly enough, we just had a use case come up that I think could
> have been solved by finishing triggers.
>
> Basically, we want to emit a notification when a certain threshold is
> reached (in this case, we saw at least N elements for a given key), and
> then never notify again within that window.  As mentioned, we can
> accomplish this using a stateful DoFn as mentioned above, but I thought it
> was interesting that this just came up, and wanted to share.
>
> Maybe it'd be worth building something to simulate this into the SDK?
>
> On Mon, Nov 4, 2019 at 8:15 PM Kenneth Knowles  wrote:
>
>> By the way, adding this guard uncovered two bugs in Beam's Java codebase,
>> luckily only benchmarks and tests. There were *no* non-buggy instances of a
>> finishing trigger. They both declare allowed lateness that is never used.
>>
>> Nexmark query 10:
>>
>> // Clear fancy triggering from above.
>> .apply(
>> Window.>into(...)
>> .triggering(AfterWatermark.pastEndOfWindow())
>> // We expect no late data here, but we'll assume the
>> worst so we can detect any.
>> .withAllowedLateness(Duration.standardDays(1))
>> .discardingFiredPanes())
>>
>> This is nonsensical: the trigger will fire once and close, never firing
>> again. So the allowed lateness has no effect except to change counters from
>> "dropped due to lateness" to "dropped due to trigger closing". The intent
>> would appear to be to restore the default triggering, but it failed.
>>
>> PipelineTranslationTest:
>>
>>Window.into(FixedWindows.of(Duration.standardMinutes(7)))
>> .triggering(
>> AfterWatermark.pastEndOfWindow()
>>
>> .withEarlyFirings(AfterPane.elementCountAtLeast(19)))
>> .accumulatingFiredPanes()
>> .withAllowedLateness(Duration.standardMinutes(3L)));
>>
>> Again, the allowed lateness has no effect. This test is just to test
>> portable proto round-trip. But still it is odd to write a nonsensical
>> pipeline for this.
>>
>> Takeaway: experienced Beam developers never use this pattern, but they
>> still get it wrong and create pipelines that would have data loss bugs
>> because of it.
>>
>> Since there is no other discussion here, I will trust the community is OK
>> with this change and follow Jan's review of my implementation of his idea.
>>
>> Kenn
>>
>>
>> On Thu, Oct 31, 2019 at 4:06 PM Kenneth Knowles  wrote:
>>
>>> Opened https://github.com/apache/beam/pull/9960 for this idea. This
>>> will alert users to broken pipelines and force them to alter them.
>>>
>>> Kenn
>>>
>>> On Thu, Oct 31, 2019 at 2:12 PM Kenneth Knowles  wrote:
>>>
 On Thu, Oct 31, 2019 at 2:11 AM Jan Lukavský  wrote:

> Hi Kenn,
>
> does there still remain some use for trigger to finish? If we don't
> drop
> data, would it still be of any use to users? If not, would it be
> better
> to just remove the functionality completely, so that users who use it
> (and it will possibly break for them) are aware of it at compile time?
>
> Jan
>

 Good point. I believe there is no good use for a top-level trigger
 finishing. As mentioned, the intended uses aren't really met by triggers,
 but are met by stateful DoFn.

 Eugene's bug even has this title :-). We could not change any behavior
 but just reject pipelines with broken top-level triggers. This is probably
 a better solution. Because if a user has a broken trigger, the new behavior
 is probably not enough to magically fix their pipeline. They are better off
 knowing that they are broken and fixing it.

 And at that point, there is a lot of dead code and my PR is really just
 cleaning it up as a simplification.

 Kenn



> On 10/30/19 11:26 PM, Kenneth Knowles wrote:
> > Problem: a trigger can "finish" which causes a window to "close" and
> > drop all remaining data arriving for that window.
> >
> > This has been discussed many times and I thought fixed, but it seems
> > to not be fixed. It does not seem to have its own Jira or thread
> that
> > I can find. But here are some pointers:
> >
> >  - data loss bug:
> >
> 

Re: [DISCUSS] Avoid redundant encoding and decoding between runner and harness

2019-11-08 Thread Luke Cwik
On Thu, Nov 7, 2019 at 7:36 PM Kenneth Knowles  wrote:

>
>
> On Thu, Nov 7, 2019 at 9:19 AM Luke Cwik  wrote:
>
>> I did suggest one other alternative on Jincheng's PR[1] which was to
>> allow windowless values to be sent across the gRPC port. The SDK would then
>> be responsible for ensuring that the execution didn't access any properties
>> that required knowledge of the timestamp, pane or window. This is different
>> then adding the ValueOnlyWindowedValueCoder as a model coder because it
>> allows SDKs to pass around raw values to functions without any windowing
>> overhead which could be useful for things like the side input window
>> mapping or window merging functions we have.
>>
>
> When you say "pass around" what does it mean? If it is over the wire,
> there is already no overhead to ValueOnlyWindowedValueCoder. So do you mean
> the overhead of having the layer of boxing of WindowedValue? I would assume
> all non-value components of the WindowedValue from
> ValueOnlyWindowedValueCoder are pointers to a single shared immutable
> instance carried by the coder instance.
>

I was referring to the layer of boxing of WindowedValue. My concern wasn't
the performance overhead of passing around a wrapper object but the
cognitive overhead of understanding why everything needs to be wrapped in a
windowed value. Since you have been working on SQL for some time, this
would be analogous to executing a UDF and making all the machinery around
it take WindowedValue instead of T.


> I think raw values can already be passed to functions, no? The main thing
> is that elements in a PCollection always have a window, timestamp, and
> paneinfo. Not all values are elements. Is there a specific case you have in
> mind? I would not expect WindowMappingFn or window merging fn to be passing
> "elements" but just values of the appropriate type for the function.
>

This is about the machinery around WindowMappingFn/WindowMergingFn. For
example the implementation around WindowMappingFn takes a
WindowedValue and unwraps it forwarding it to the WindowMappingFn
and then takes the result and wraps it in a WindowedValue and
returns that to the runner.


>
>
>> On Thu, Nov 7, 2019 at 8:48 AM Robert Bradshaw 
>> wrote:
>>
>>> I think there is some misunderstanding about what is meant by option
>>> 2. What Kenn (I think) and I are proposing is not a WindowedValueCoder
>>> whose window/timestamp/paneinfo coders are parameterized to be
>>> constant coders, but a WindowedValueCoder whose
>>> window/timestamp/paneinfo values are specified as constants in the
>>> coder.
>>>
>>> Let's call this NewValueOnlyWindowedValueCoder, and is parameterized
>>> by a window, timestamp, and pane info instance
>>>
>>> The existing ValueOnlyWindowedValueCoder is literally
>>> NewValueOnlyWindowedValueCoder(GlobalWindow, MIN_TIMESTAMP,
>>> PaneInfo.NO_FIRING). Note in particular that using the existing
>>> ValueOnlyWindowedValueCoder would give the wrong timestamp and pane
>>> info if it is use for the result of a GBK, which I think is the loss
>>> of consistency referred to here.
>>>
>>
> Yes, this is exactly what I am proposing and sounds like what "approach 2"
> is. I think this approach "just works". It is simpler and more efficient
> than "WindowedValueCoder.of(ValueCoder, WindowCoder, TimestampCoder,
> PaneInfoCoder)" which would require some overhead even if WindowCoder,
> TimestampCoder and PaneInfoCoder were constant coders consuming zero bytes.
>
> Kenn
>
>
>
>> On Thu, Nov 7, 2019 at 1:03 AM jincheng sun 
>>> wrote:
>>> >
>>> > Thanks for your feedback and the valuable comments, Kenn & Robert!
>>> >
>>> > I think your comments are more comprehensive and enlighten me a lot.
>>> The two proposals which I mentioned above are to reuse the existing coder
>>> (FullWindowedValueCoder and ValueOnlyWindowedValueCoder). Now, with your
>>> comments, I think we can further abstract 'FullWindowedValueCoder' and
>>> 'ValueOnlyWindowedValueCoder', that is, we can rename
>>> 'FullWindowedValueCoder' to 'WindowedValueCoder', and make the coders of
>>> window/timestamp/pane configurable. Then we can remove
>>> 'ValueOnlyWindowedValueCoder' and need to add a mount of constant coders
>>> for window/timestamp/pane.
>>> >
>>> > I have replied your comments on the doc, and quick feedback as
>>> following:
>>> >
>>> > Regarding to "Approach 2: probably no SDK harness work / compatible
>>> with existing Beam model so no risk of introducing inconsistency",if we
>>> "just puts default window/timestamp/pane info on elements" and don't change
>>> the original coder, then the performance is not optimized. If we want to
>>> get the best performance, then the default coder of Window/timestamp/pane
>>> should be constant coder. In this case the SDK harnesses need to be aware
>>> of the constant coder and there will be some development work in the SDK
>>> harness. Besides, the SDK harness also needs to make the coders for
>>> window/timestamp/pane configurable and this will 

Re: Key encodings for state requests

2019-11-08 Thread Maximilian Michels
Thank you for your comments. Here is the updated PR according to option 
(1): https://github.com/apache/beam/pull/9997


-Max

On 08.11.19 11:08, jincheng sun wrote:

Hi,

Sorry for my late reply. It seems the conclusion has been reached. I 
just want to share my personal thoughts.


Generally, both option 1 and 3 make sense to me.

 >> The key concept here is not "standard coder" but "coder that the
 >> runner does not understand." This knowledge is only in the runner.
 >> Also has the downside of (2).

 >Yes, I had assumed "non-standard" and "unknown" are the same, but the
 >latter can be a subset of the former, i.e. if a Runner does not support
 >all of the standard coders for some reason.

I'm also assume that "non-standard" and "unknown" are the same. 
Currently, in the runner side[1] it
decides whether the coder is unknown(wrap with length prefix coder) 
according to whether the coder is among
the standard coders. It will not communicate with harness to make this 
decision.


So, from my point of view, we can update the PR according to option 1 or 3.

[1] 
https://github.com/apache/beam/blob/66a67e6580b93906038b31ae7070204cec90999c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62


Maximilian Michels mailto:m...@apache.org>> 于2019年11月 
8日周五 上午3:35写道:


 > While the Go SDK doesn't yet support a State API, Option 3) is
what the Go SDK does for all non-standard coders (aka custom coders)
anyway.

For wire transfer, the Java Runner also adds a LengthPrefixCoder for
the
coder and its subcomponents. The problem is that this is an implicit
assumption made. In the Proto, we do not have this represented. This is
why **for state requests**, we end up with a
"LengthPrefixCoder[CustomCoder]" on the Runner and a "CustomCoder" on
the SDK Harness side. Note that the Python Harness does wrap unknown
coders in a LengthPrefixCoder for transferring regular elements, but
the
LengthPrefixCoder is not preserved for the state requests.

In that sense (3) is good because it follows this implicit notion of
adding a LengthPrefixCoder for wire transfer, but applies it to state
requests.

However, option (1) is most reliable because the LengthPrefixCoder is
actually in the Proto. So "CustomCoder" will always be represented as
"LengthPrefixCoder[CustomCoder]", and only standard coders will be
added
without a LengthPrefixCoder.

 > I'd really like to avoid implicit agreements about how the coder that
 > should be used differs from what's specified in the proto in
different
 > contexts.

Option (2) would work on top of the existing logic because replacing a
non-standard coder with a "NOOP coder" would just be used by the Runner
to produce a serialized version of the key for partitioning. Flink
always operates on the serialized key, be it standard or non-standard
coder. It wouldn't be necessary to change any of the existing wire
transfer logic or representation. I understand that it would be less
ideal, but maybe easier to fix for the release.

 > The key concept here is not "standard coder" but "coder that the
 > runner does not understand." This knowledge is only in the runner.
 > Also has the downside of (2).

Yes, I had assumed "non-standard" and "unknown" are the same, but the
latter can be a subset of the former, i.e. if a Runner does not support
all of the standard coders for some reason.

 > This means that the wire format that the runner sends for the
"key" represents the exact same wire format it will receive for
state requests.

The wire format for the entire element is the same. Otherwise we
wouldn't be able to process data between the Runner and the SDK
Harness.
However, the problem is that the way the Runner instantiates the key
coder to partition elements, does not match how the SDK encodes the key
when it sends a state request to the Runner. Conceptually, those two
situations should be the same, but in practice they are not.


Now that I thought about it again option (1) is probably the most
explicit and in that sense cleanest. However, option (3) is kind of
fair
because it would just replicate the implicit LengthPrefixCoder behavior
we have for general wire transfer also for state requests. Option (2) I
suppose is the most implicit and runner-specific, should probably be
avoided in the long run.

So I'd probably opt for (1) and I would update the PR[1] rather soon
because this currently blocks the release, as this is a regression from
2.16.0.[2]


-Max

[1] https://github.com/apache/beam/pull/9997
[2] (In 2.16.0 it worked for Python because the Runner used a
ByteArrayCoder with the OUTER encoding context for the key which was
basically option (2). Only problem that, for standard coders the Java

Re: RabbitMQ and CheckpointMark feasibility

2019-11-08 Thread Daniel Robert

Thanks Euguene and Reuven.

In response to Eugene, I'd like to confirm I have this correct: In the 
rabbit-style use case of "stream-system-side checkpointing", it is safe 
(and arguably the correct behavior) to ignore the supplied 
CheckpointMark argument in `createReader(options, checkpointmark)` and 
in the constructor for the and instead always instantiate a new 
CheckpointMark during construction. Is that correct?


In response to Reuven: noted, however I was mostly using serialization 
in the general sense. That is, there does not seem to be any means of 
deserializing a RabbitMqCheckpointMark such that it can continue to 
provide value to a runner. Whether it's java serialization, avro, or any 
other Coder, the 'channel' itself cannot "come along for the ride", 
which leaves the rest of the internal state mostly unusable except for 
perhaps some historical, immutable use case.


-Danny

On 11/8/19 2:01 AM, Reuven Lax wrote:
Just to clarify one thing: CheckpointMark does not need to be Java 
Seralizable. All that's needed is do return a Coder for the 
CheckpointMark in getCheckpointMarkCoder.


On Thu, Nov 7, 2019 at 7:29 PM Eugene Kirpichov > wrote:


Hi Daniel,

This is probably insufficiently well documented. The
CheckpointMark is used for two purposes:
1) To persistently store some notion of how much of the stream has
been consumed, so that if something fails we can tell the
underlying streaming system where to start reading when we
re-create the reader. This is why CheckpointMark is Serializable.
E.g. this makes sense for Kafka.
2) To do acks - to let the underlying streaming system know that
the Beam pipeline will never need data up to this CheckpointMark.
Acking does not require serializability - runners call ack() on
the same in-memory instance of CheckpointMark that was produced by
the reader. E.g. this makes sense for RabbitMq or Pubsub.

In practice, these two capabilities tend to be mutually exclusive:
some streaming systems can provide a serializable CheckpointMark,
some can do acks, some can do neither - but very few (or none) can
do both, and it's debatable whether it even makes sense for a
system to provide both capabilities: usually acking is an implicit
form of streaming-system-side checkpointing, i.e. when you
re-create the reader you don't actually need to carry over any
information from an old CheckpointMark - the necessary state
(which records should be delivered) is maintained on the streaming
system side.

These two are lumped together into one API simply because that was
the best design option we came up with (not for lack of trying,
but suggestions very much welcome - AFAIK nobody is happy with it).

RabbitMQ is under #2 - it can't do serializable checkpoint marks,
but it can do acks. So you can simply ignore the non-serializability.

On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
mailto:daniel.rob...@acm.org>> wrote:

(Background: I recently upgraded RabbitMqIO from the 4.x to
5.x library.
As part of this I switched to a pull-based API rather than the
previously-used push-based. This has caused some nebulous
problems so
put up a correction PR that I think needs some eyes fairly
quickly as
I'd consider master to be broken for rabbitmq right now. The
PR keeps
the upgrade but reverts to the same push-based implementation
as in 4.x:
https://github.com/apache/beam/pull/9977 )

Regardless, in trying to get the pull-based API to work, I'm
finding the
interactions between rabbitmq and beam with CheckpointMark to be
fundamentally impossible to implement so I'm hoping for some
input here.

CheckointMark itself must be Serializable, presumably this
means it gets
shuffled around between nodes. However 'Channel', the tunnel
through
which it communicates with Rabbit to ack messages and finalize
the
checkpoint, is non-Serializable. Like most other CheckpointMark
implementations, Channel is 'transient'. When a new
CheckpointMark is
instantiated, it's given a Channel. If an existing one is
supplied to
the Reader's constructor (part of the 'startReader()'
interface), the
channel is overwritten.

*However*, Rabbit does not support 'ack'ing messages on a
channel other
than the one that consumed them in the first place. Attempting
to do so
results in a '406 (PRECONDITION-FAILED) - unknown delivery
tag'. (See

https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed

).

Truthfully, I don't really understand how the current
implementation is
working; it seems like a happy accident. But I'm curious if

Re: Beam runner statsd metrics

2019-11-08 Thread Kush Kumar Sharma
Hi Maximilian,

I have used Beam's Metrics in our pipeline implementation but it's of no
use.
I just want to understand whether beam's metrics system automatically sends
these counters to flink's statsd implementation. FYI I do get flink's
general metrics like taskSlotsTotal, numRunningJobs, etc in my
telegraf->influx stack which means flink is able to communicate with statsd
just fine.

public class KafkaRecordToMessageDoFn extends DoFn, KV> {
private Counter counter =
Metrics.counter(KafkaRecordToMessageDoFn.class, "my-counter");

@ProcessElement
public void processElement(ProcessContext c) throws Exception {

   counter.inc();

}

}

Regards
Kush Sharma


On Fri, Nov 8, 2019 at 4:10 PM Maximilian Michels  wrote:

> Hi Kush,
>
> Beam has its own Metrics[1] which are reported via the Flink metric
> system. You may want to use those and utilize the Flink statsd reporter:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#statsd-orgapacheflinkmetricsstatsdstatsdreporter
>
> Cheers,
> Max
>
> [1]
>
> https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/metrics/Metrics.html
>
> On 08.11.19 11:31, Kush Kumar Sharma wrote:
> > Hi Devs!
> >
> > I am trying to use a *Statsd* client in beam to export some runner
> > metrics. I am able to extract out metrics from the base application but
> > once the job is submitted to its runner(in this case I am using
> > *FlinkRunner*), statsd client stops working. This is a streaming job and
> > I need to extract out those metrics continuously(like how many files are
> > getting processed, etc).
> >
> > Are there any helper classes for this particular use case in beam? If
> > not, are there any flink specific classes that I can use here? I have
> > seen *MetricResults*(from *FlinkRunnerResult*) but it looks like I can
> > only use that once the job is done not when in progress.
> >
> > My current statsd client is
> https://github.com/tim-group/java-statsd-client
> >
> > Regards
> > Kush Sharma
>


Re: [Discuss] Beam Summit 2020 Dates & locations

2019-11-08 Thread jincheng sun
+1 for extend the discussion to the user mailing list?

Maximilian Michels  于2019年11月8日周五 下午6:32写道:

> The dates sounds good to me. I agree that the bay area has an advantage
> because of its large tech community. On the other hand, it is a question
> of how we run the event. For Berlin we managed to get about 200
> attendees to Berlin, but for the BeamSummit in Las Vegas with ApacheCon
> the attendance was much lower.
>
> Should this also be discussed on the user mailing list?
>
> Cheers,
> Max
>
> On 07.11.19 22:50, Alex Van Boxel wrote:
> > For date wise, I'm wondering why we should switching the Europe and NA
> > one, this would mean that the Berlin and the new EU summit would be
> > almost 1.5 years apart.
> >
> >   _/
> > _/ Alex Van Boxel
> >
> >
> > On Thu, Nov 7, 2019 at 8:43 PM Ahmet Altay  > > wrote:
> >
> > I prefer bay are for NA summit. My reasoning is that there is a
> > criticall mass of contributors and users in that location, probably
> > more than alternative NA locations. I was not involved with planning
> > recently and I do not know if there were people who could attend due
> > to location previously. If that is the case, I agree with Elliotte
> > on looking for other options.
> >
> > Related to dates: March (Asia) and mid-May (NA) dates are a bit
> > close. Mid-June for NA might be better to spread events. Other
> > pieces looks good.
> >
> > Ahmet
> >
> > On Thu, Nov 7, 2019 at 7:09 AM Elliotte Rusty Harold
> > mailto:elh...@ibiblio.org>> wrote:
> >
> > The U.S. sadly is not a reliable destination for international
> > conferences these days. Almost every conference I go to, big and
> > small, has at least one speaker, sometimes more, who can't get
> into
> > the country. Canada seems worth considering. Vancouver,
> > Montreal, and
> > Toronto are all convenient.
> >
> > On Wed, Nov 6, 2019 at 2:17 PM Griselda Cuevas  > > wrote:
> >  >
> >  > Hi Beam Community!
> >  >
> >  > I'd like to kick off a thread to discuss potential dates and
> > venues for the 2020 Beam Summits.
> >  >
> >  > I did some research on industry conferences happening in 2020
> > and pre-selected a few ranges as follows:
> >  >
> >  > (2 days) NA between mid-May and mid-June
> >  > (2 days) EU mid October
> >  > (1 day) Asia Mini Summit:  March
> >  >
> >  > I'd like to hear your thoughts on these dates and get
> > consensus on exact dates as the convo progresses.
> >  >
> >  > For locations these are the options I reviewed:
> >  >
> >  > NA: Austin Texas, Berkeley California, Mexico City.
> >  > Europe: Warsaw, Barcelona, Paris
> >  > Asia: Singapore
> >  >
> >  > Let the discussion begin!
> >  > G (on behalf of the Beam Summit Steering Committee)
> >  >
> >  >
> >  >
> >
> >
> > --
> > Elliotte Rusty Harold
> > elh...@ibiblio.org 
> >
>


Re: (Question) SQL integration tests for MongoDb

2019-11-08 Thread Michał Walenia
Won't the command be analogous to what is in the Javadoc of
MongoDbReadWriteIT? It seems that you don't need to use
`enableJavaPerformanceTesting`, as `integrationTest` task parses
`pipelineOptions` parameter.



On Thu, Nov 7, 2019 at 6:40 PM Kirill Kozlov 
wrote:

> Thank you for your response!
>
> I want to make sure that when tests run on Jenkins they get supplied with
> pipelines options containing hostName and Port of a running MongoDb service.
>
> I'm writing integration test for a MongoDb SQL adapter (located
> sdks/java/extensions/sql/meta/provider/mongodb).
> I cannot simply use `enableJavaPerformanceTesting()`, because tests for
> all adapters are run via the same build file, which has a custom task
> "integrationTest".
>
> I hope this better explains the problem I am trying to tackle.
>
> -
> Kirill
>
> On Thu, Nov 7, 2019, 03:36 Michał Walenia 
> wrote:
>
>> Hi,
>>
>> What exactly are you trying to do? If you're looking for a way to provide
>> pipeline options to the MongoDBIOIT, you can pass them via command line
>> like this:
>>
>> ./gradlew integrationTest -p sdks/java/io/mongodb
>>
>>
>>
>> * -DintegrationTestPipelineOptions='[   "--mongoDBHostName=1.2.3.4",
>>  "--mongoDBPort=27017",   "--mongoDBDatabaseName=mypass",
>>  "--numberOfRecords=1000" ]'*
>>--tests org.apache.beam.sdk.io.mongodb.MongoDbIOIT
>>-DintegrationTestRunner=direct
>>
>> Gradle tasks created with `enableJavaPerformanceTesting()` will allow
>> such options to be passed.
>>
>> If you're trying to do something else, please let me know.
>>
>> Regards
>> Michal
>>
>> On Thu, Nov 7, 2019 at 1:44 AM Kirill Kozlov 
>> wrote:
>>
>>> Hi everyone!
>>>
>>> I am trying to test MongoDb Sql Table, but not quite sure how to pass
>>> pipeline options with the hostName, port, and databaseName used by Jenkins.
>>>
>>> It looks like the integration test for MongoDbIO Connector obtain those
>>> values from the
>>> 'beam/.test-infra/jenkins/job_PerformanceTests_MongoDBIO_IT.groovy' file
>>> via calling the following methods in the 'gradle.build' file:
>>> provideIntegrationTestingDependencies()
>>> enableJavaPerformanceTesting()
>>>
>>> Sql build file already has a task with the name 'integrationTest'
>>> defined and does not let us do `enableJavaPerformanceTesting()`.
>>>
>>>  I would really appreciate if someone could provide me with a couple of
>>> pointers on getting this to work.
>>>
>>> -
>>> Kirill
>>>
>>
>>
>> --
>>
>> Michał Walenia
>> Polidea  | Software Engineer
>>
>> M: +48 791 432 002 <+48791432002>
>> E: michal.wale...@polidea.com
>>
>> Unique Tech
>> Check out our projects! 
>>
>

-- 

Michał Walenia
Polidea  | Software Engineer

M: +48 791 432 002 <+48791432002>
E: michal.wale...@polidea.com

Unique Tech
Check out our projects! 


Re: Beam runner statsd metrics

2019-11-08 Thread Maximilian Michels

Hi Kush,

Beam has its own Metrics[1] which are reported via the Flink metric 
system. You may want to use those and utilize the Flink statsd reporter: 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#statsd-orgapacheflinkmetricsstatsdstatsdreporter


Cheers,
Max

[1] 
https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/metrics/Metrics.html


On 08.11.19 11:31, Kush Kumar Sharma wrote:

Hi Devs!

I am trying to use a *Statsd* client in beam to export some runner 
metrics. I am able to extract out metrics from the base application but 
once the job is submitted to its runner(in this case I am using 
*FlinkRunner*), statsd client stops working. This is a streaming job and 
I need to extract out those metrics continuously(like how many files are 
getting processed, etc).


Are there any helper classes for this particular use case in beam? If 
not, are there any flink specific classes that I can use here? I have 
seen *MetricResults*(from *FlinkRunnerResult*) but it looks like I can 
only use that once the job is done not when in progress.


My current statsd client is https://github.com/tim-group/java-statsd-client

Regards
Kush Sharma


Re: [VOTE] @RequiresTimeSortedInput stateful DoFn annotation

2019-11-08 Thread Jan Lukavský

Hi Max,

thanks for comment. I probably should have put links to discussion 
threads here in the vote thread. Relevant would be


 - (a pretty lengthy) discussion about whether sorting by timestamp 
should be part of the model - [1]


 - part of the discussion related to the annotation - [2]

Regarding the open question in the design document - these are not meant 
to be open questions in regard to the design of the annotation and I'll 
remove that for now, as it is not (directly) related.


Now - main reason for this vote is that there is actually not a clear 
consensus in the ML thread. There are plenty of words like "should", 
"could", "would" and "maybe", so I wanted to be sure there is consensus 
to include this. I already run this in production for several months, so 
it is definitely useful for me. :-) But that might not be sufficient.


I'd be very happy to answer any more questions.

Thanks,

 Jan

[1] 
https://lists.apache.org/thread.html/4609a1bb1662690d67950e76d2f1108b51327b8feaf9580de659552e@%3Cdev.beam.apache.org%3E


[2] 
https://lists.apache.org/thread.html/dd9bec903102d9fcb4f390dc01513c0921eac1fedd8bcfdac630aaee@%3Cdev.beam.apache.org%3E


On 11/8/19 11:08 AM, Maximilian Michels wrote:

Hi Jan,

Disclaimer: I haven't followed the discussion closely, so I do not 
want to comment on the technical details of the feature here.


From the outside, it looks like there may be open questions. Also, we 
may need more motivation for what we can build with this feature or 
how it will become useful to users.


There are many threads in Beam and I believe we need to carefully 
prioritize the Beam feature set in order to focus on the things that 
provide the most value to our users.


Cheers,
Max

On 07.11.19 15:55, Jan Lukavský wrote:

Hi,
is there anything I can do to make this more attractive? :-) Any 
feedback would be much appreciated.

Many thanks,
  Jan

Dne 5. 11. 2019 14:10 napsal uživatel Jan Lukavský :

    Hi,

    I'd like to open a vote on accepting design document [1] as a 
base for

    implementation of @RequiresTimeSortedInput annotation for stateful
    DoFns. Associated JIRA [2] and PR [3] contains only subset of the 
whole
    functionality (allowed lateness ignored and no possibility to 
specify

    UDF for time - or sequential number - to be extracted from data).
    The PR
    will be subject to independent review process (please feel free to
    self-request review if you are interested in this) after the vote 
would
    eventually succeed. Missing features from the design document 
will be

    added later in subsequent JIRA issues, so that it doesn't block
    availability of this feature.

    Please vote on adding support for @RequiresTimeSortedInput.

    The vote is open for the next 72 hours and passes if at least 
three +1

    and no -1 PMC (binding) votes are cast.

    [ ] +1 Add support for @RequiresTimeSortedInput

    [ ] 0 I don't have a strong opinion about this, but I assume it's ok

    [ ] -1 Do not support @RequiresTimeSortedInput - please provide
    explanation.

    Thanks,

  Jan

    [1]
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing


    [2] https://issues.apache.org/jira/browse/BEAM-8550

    [3] https://github.com/apache/beam/pull/8774




Beam runner statsd metrics

2019-11-08 Thread Kush Kumar Sharma
Hi Devs!

I am trying to use a *Statsd* client in beam to export some runner metrics.
I am able to extract out metrics from the base application but once the job
is submitted to its runner(in this case I am using *FlinkRunner*), statsd
client stops working. This is a streaming job and I need to extract out
those metrics continuously(like how many files are getting processed, etc).

Are there any helper classes for this particular use case in beam? If not,
are there any flink specific classes that I can use here? I have seen
*MetricResults*(from *FlinkRunnerResult*) but it looks like I can only use
that once the job is done not when in progress.

My current statsd client is https://github.com/tim-group/java-statsd-client

Regards
Kush Sharma


Re: [Discuss] Beam Summit 2020 Dates & locations

2019-11-08 Thread Maximilian Michels
The dates sounds good to me. I agree that the bay area has an advantage 
because of its large tech community. On the other hand, it is a question 
of how we run the event. For Berlin we managed to get about 200 
attendees to Berlin, but for the BeamSummit in Las Vegas with ApacheCon 
the attendance was much lower.


Should this also be discussed on the user mailing list?

Cheers,
Max

On 07.11.19 22:50, Alex Van Boxel wrote:
For date wise, I'm wondering why we should switching the Europe and NA 
one, this would mean that the Berlin and the new EU summit would be 
almost 1.5 years apart.


  _/
_/ Alex Van Boxel


On Thu, Nov 7, 2019 at 8:43 PM Ahmet Altay > wrote:


I prefer bay are for NA summit. My reasoning is that there is a
criticall mass of contributors and users in that location, probably
more than alternative NA locations. I was not involved with planning
recently and I do not know if there were people who could attend due
to location previously. If that is the case, I agree with Elliotte
on looking for other options.

Related to dates: March (Asia) and mid-May (NA) dates are a bit
close. Mid-June for NA might be better to spread events. Other
pieces looks good.

Ahmet

On Thu, Nov 7, 2019 at 7:09 AM Elliotte Rusty Harold
mailto:elh...@ibiblio.org>> wrote:

The U.S. sadly is not a reliable destination for international
conferences these days. Almost every conference I go to, big and
small, has at least one speaker, sometimes more, who can't get into
the country. Canada seems worth considering. Vancouver,
Montreal, and
Toronto are all convenient.

On Wed, Nov 6, 2019 at 2:17 PM Griselda Cuevas mailto:g...@apache.org>> wrote:
 >
 > Hi Beam Community!
 >
 > I'd like to kick off a thread to discuss potential dates and
venues for the 2020 Beam Summits.
 >
 > I did some research on industry conferences happening in 2020
and pre-selected a few ranges as follows:
 >
 > (2 days) NA between mid-May and mid-June
 > (2 days) EU mid October
 > (1 day) Asia Mini Summit:  March
 >
 > I'd like to hear your thoughts on these dates and get
consensus on exact dates as the convo progresses.
 >
 > For locations these are the options I reviewed:
 >
 > NA: Austin Texas, Berkeley California, Mexico City.
 > Europe: Warsaw, Barcelona, Paris
 > Asia: Singapore
 >
 > Let the discussion begin!
 > G (on behalf of the Beam Summit Steering Committee)
 >
 >
 >


-- 
Elliotte Rusty Harold

elh...@ibiblio.org 



Re: Key encodings for state requests

2019-11-08 Thread jincheng sun
Hi,

Sorry for my late reply. It seems the conclusion has been reached. I just
want to share my personal thoughts.

Generally, both option 1 and 3 make sense to me.

>> The key concept here is not "standard coder" but "coder that the
>> runner does not understand." This knowledge is only in the runner.
>> Also has the downside of (2).

>Yes, I had assumed "non-standard" and "unknown" are the same, but the
>latter can be a subset of the former, i.e. if a Runner does not support
>all of the standard coders for some reason.

I'm also assume that "non-standard" and "unknown" are the same. Currently,
in the runner side[1] it
decides whether the coder is unknown(wrap with length prefix coder)
according to whether the coder is among
the standard coders. It will not communicate with harness to make this
decision.

So, from my point of view, we can update the PR according to option 1 or 3.

[1]
https://github.com/apache/beam/blob/66a67e6580b93906038b31ae7070204cec90999c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62

Maximilian Michels  于2019年11月8日周五 上午3:35写道:

> > While the Go SDK doesn't yet support a State API, Option 3) is what the
> Go SDK does for all non-standard coders (aka custom coders) anyway.
>
> For wire transfer, the Java Runner also adds a LengthPrefixCoder for the
> coder and its subcomponents. The problem is that this is an implicit
> assumption made. In the Proto, we do not have this represented. This is
> why **for state requests**, we end up with a
> "LengthPrefixCoder[CustomCoder]" on the Runner and a "CustomCoder" on
> the SDK Harness side. Note that the Python Harness does wrap unknown
> coders in a LengthPrefixCoder for transferring regular elements, but the
> LengthPrefixCoder is not preserved for the state requests.
>
> In that sense (3) is good because it follows this implicit notion of
> adding a LengthPrefixCoder for wire transfer, but applies it to state
> requests.
>
> However, option (1) is most reliable because the LengthPrefixCoder is
> actually in the Proto. So "CustomCoder" will always be represented as
> "LengthPrefixCoder[CustomCoder]", and only standard coders will be added
> without a LengthPrefixCoder.
>
> > I'd really like to avoid implicit agreements about how the coder that
> > should be used differs from what's specified in the proto in different
> > contexts.
>
> Option (2) would work on top of the existing logic because replacing a
> non-standard coder with a "NOOP coder" would just be used by the Runner
> to produce a serialized version of the key for partitioning. Flink
> always operates on the serialized key, be it standard or non-standard
> coder. It wouldn't be necessary to change any of the existing wire
> transfer logic or representation. I understand that it would be less
> ideal, but maybe easier to fix for the release.
>
> > The key concept here is not "standard coder" but "coder that the
> > runner does not understand." This knowledge is only in the runner.
> > Also has the downside of (2).
>
> Yes, I had assumed "non-standard" and "unknown" are the same, but the
> latter can be a subset of the former, i.e. if a Runner does not support
> all of the standard coders for some reason.
>
> > This means that the wire format that the runner sends for the "key"
> represents the exact same wire format it will receive for state requests.
>
> The wire format for the entire element is the same. Otherwise we
> wouldn't be able to process data between the Runner and the SDK Harness.
> However, the problem is that the way the Runner instantiates the key
> coder to partition elements, does not match how the SDK encodes the key
> when it sends a state request to the Runner. Conceptually, those two
> situations should be the same, but in practice they are not.
>
>
> Now that I thought about it again option (1) is probably the most
> explicit and in that sense cleanest. However, option (3) is kind of fair
> because it would just replicate the implicit LengthPrefixCoder behavior
> we have for general wire transfer also for state requests. Option (2) I
> suppose is the most implicit and runner-specific, should probably be
> avoided in the long run.
>
> So I'd probably opt for (1) and I would update the PR[1] rather soon
> because this currently blocks the release, as this is a regression from
> 2.16.0.[2]
>
>
> -Max
>
> [1] https://github.com/apache/beam/pull/9997
> [2] (In 2.16.0 it worked for Python because the Runner used a
> ByteArrayCoder with the OUTER encoding context for the key which was
> basically option (2). Only problem that, for standard coders the Java
> SDK Harness produced non-matching state request keys, due to it using
> the NESTED context.)
>
> On 07.11.19 18:01, Luke Cwik wrote:
> >
> >
> > On Thu, Nov 7, 2019 at 8:22 AM Robert Bradshaw  > > wrote:
> >
> > On Thu, Nov 7, 2019 at 6:26 AM Maximilian Michels  > > wrote:
> >  >
> 

Re: [VOTE] @RequiresTimeSortedInput stateful DoFn annotation

2019-11-08 Thread Maximilian Michels

Hi Jan,

Disclaimer: I haven't followed the discussion closely, so I do not want 
to comment on the technical details of the feature here.


From the outside, it looks like there may be open questions. Also, we 
may need more motivation for what we can build with this feature or how 
it will become useful to users.


There are many threads in Beam and I believe we need to carefully 
prioritize the Beam feature set in order to focus on the things that 
provide the most value to our users.


Cheers,
Max

On 07.11.19 15:55, Jan Lukavský wrote:

Hi,
is there anything I can do to make this more attractive? :-) Any 
feedback would be much appreciated.

Many thanks,
  Jan

Dne 5. 11. 2019 14:10 napsal uživatel Jan Lukavský :

Hi,

I'd like to open a vote on accepting design document [1] as a base for
implementation of @RequiresTimeSortedInput annotation for stateful
DoFns. Associated JIRA [2] and PR [3] contains only subset of the whole
functionality (allowed lateness ignored and no possibility to specify
UDF for time - or sequential number - to be extracted from data).
The PR
will be subject to independent review process (please feel free to
self-request review if you are interested in this) after the vote would
eventually succeed. Missing features from the design document will be
added later in subsequent JIRA issues, so that it doesn't block
availability of this feature.

Please vote on adding support for @RequiresTimeSortedInput.

The vote is open for the next 72 hours and passes if at least three +1
and no -1 PMC (binding) votes are cast.

[ ] +1 Add support for @RequiresTimeSortedInput

[ ] 0 I don't have a strong opinion about this, but I assume it's ok

[ ] -1 Do not support @RequiresTimeSortedInput - please provide
explanation.

Thanks,

  Jan

[1]

https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing


[2] https://issues.apache.org/jira/browse/BEAM-8550

[3] https://github.com/apache/beam/pull/8774