Re: Docker Run Options in SDK Container

2019-08-09 Thread Lukasz Cwik
On Fri, Aug 2, 2019 at 11:00 AM Chad Dombrova  wrote:

> Hi all,
> I’m a bit confused about the desire to use json for the environment_config.
>
Note that complex PipelineOptions are already expected to be in JSON
format[1, 2]. This has solved many string parsing and ambiguity issues.

> It’s harder to use json on the command line, such that now we’re talking
> about the value being *either* a docker image name *or* a path to a json
> file (OR maybe yaml too!), which is not only less convenient than just
> typing the docker ags you want, it's also IMHO a dirty/inconsistent design.
>
> The idea of having each key of the json config map to a docker flag seems
> like a maintenance quagmire with little benefit.  In that case, Beam devs
> would have to maintain parity with docker options and be the arbiters of
> what's "safe" and what's not, and users would have to read additional beam
> documentation (which may or may not exist) to discover what keys are valid,
> rather than simply doing what they know, and passing the docker args.  As
> Sam points out, if security is a concern there are plenty of ways to abuse
> the system already. Security should be handled at the infrastructure
> deployment level, where it’s actually meaningful.
>
Wouldn't supporting every possible docker option also be a backwards
compatibility and portability quagmire?
Users could say that option X worked with Beam Y but no longer with Y+1 or
with runner A since it used docker Z+1 but not with runner B because it
uses docker Z.

Both options have tradeoffs and the important part is whether the
convenience of specifying all options available to users via docker run
outweigh the drawbacks.

> It also seems like there’s already a precedent for encoding environment
> configuration as command line args. Consider the SUBPROCESS_SDK environment:
>
> options = PipelineOptions()
> options.view_as(PortableOptions).environment_type = \
> python_urns.SUBPROCESS_SDK
> options.view_as(PortableOptions).environment_config = \
> b'/usr/bin/python -m apache_beam.runners.worker.sdk_worker_main'
>
> This could be encoded as json to avoid someone passing something nasty,
> but luckily that was *not* the choice that was made, because I think this
> is a fine design.
>
> As a result, I think the original proposal was the most elegant and
> consistent with other environment types:
>
> --environment_type DOCKER --environment_config "-v 
> /Volumes/mnt/foo:/Volumes/mnt/foo --user sambvfx MY_CONTAINER_NAME"
>
>
> Note that the help docs for --environment_config heavily suggest that the
intent for the command was always JSON:
Set environment configuration for running the user code. For DOCKER: Url
for the docker image.\n For PROCESS: json of the form {"os": "",
"arch": "", "command": "",
"env":{"": ""} }. All fields in the json
are optional except command.

1:
https://github.com/apache/beam/blob/24e9cedcc768d901de795477fa78c7f357635671/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java#L163
2:
https://github.com/apache/beam/blob/24e9cedcc768d901de795477fa78c7f357635671/sdks/python/apache_beam/options/pipeline_options.py#L822


Re: Late data handling in Python SDK

2019-08-09 Thread Tanay Tummalapalli
Hi Lukasz,

I'm currently working on BEAM-7742[1].
I'll work on BEAM-3759 afterwards.

Regards,
- Tanay
[1] https://issues.apache.org/jira/browse/BEAM-7742

On Fri, Aug 9, 2019 at 9:25 PM Lukasz Cwik  wrote:

> +dev 
>
> Related JIRA's I found are BEAM-3759 and BEAM-7825. This has been a
> priority thing as the community has been trying to get streaming Python
> execution working on multiple Beam runners.
>
> On Wed, Aug 7, 2019 at 2:31 AM Sam Stephens 
> wrote:
>
>> Hi all,
>>
>> I’ve been reading into, and experimenting with, the Python SDK recently.
>> I see that late data handling is not supported currently, but I can’t find
>> where the progress of this feature is tracked: either in the portability
>> support matrix or in Beam’s Jira. Can anyone help me there?
>>
>> If the answer is that there isn’t a single place to see this, my
>> question is then: what is the current status of this? Is there some
>> underlying difficult problem that has to be solved first or is it just a
>> priority thing?
>>
>> Thanks
>> Sam
>>
>


Re: Beam Python Portable Runner - Adding timeout to JobServer grpc calls

2019-08-09 Thread enrico canzonieri
There seem to be consensus here around adding this feature. I filed
BEAM-7933  and assigned it
to me. @Robert I'll check the places where it makes sense to re-use the
timeout value for RPCs.
I should be able to publish a pr sometime around next week.

Thanks,
Enrico

On Fri, Aug 9, 2019 at 12:41 AM Robert Bradshaw  wrote:

> If we do provide a configuration value for this, I would make it have a
> fairly large default and ure-use the flag for all RPCs of similar nature,
> not tweeks for this particular service only.
>
> On Fri, Aug 9, 2019 at 2:58 AM Ahmet Altay  wrote:
>
>> Default plus a flag to override sounds reasonable. Although from Dataflow
>> experience I do not remember timeouts causing issues and each new added
>> flag adds complexity. What do others think?
>>
>> On Thu, Aug 8, 2019 at 11:38 AM Kyle Weaver  wrote:
>>
>>> If we do make a default, I still think it should be configurable via a
>>> flag. I can't think of why the prepare, stage artifact, job state, or job
>>> message requests might take more than 60 seconds, but you never know,
>>> particularly with artifact staging, which might be uploading artifacts to
>>> distributed storage.
>>>
>>> I assume the run request itself would not be subject to timeouts, as
>>> running the pipeline can be assumed to take significantly longer than the
>>> setup work.
>>>
>>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>>
>>>
>>> On Thu, Aug 8, 2019 at 11:20 AM Enrico Canzonieri 
>>> wrote:
>>>
 Default timeout with no flag may work as well.
 The main consideration here is whether some api calls may take longer
 than 60 seconds because of the complexity of the users' Beam pipeline. E.g.
 Could job_service.Prepare() take longer than 60 seconds if the given Beam
 pipeline is extremely complex?

 Basically if there are cases when the user code may cause the call
 duration to increase to the point the timeout prevents submitting the app
 itself then we should consider having a flag.

 On 2019/08/07 20:13:12, Ahmet Altay wrote:
 > Could we pick a default timeout value instead of introducing a flag?
 We use>
 > 60 seconds as the default timeout for http client [1], we can do the
 same>
 > here.>
 >
 > [1]>
 >
 https://github.com/apache/beam/blob/3a182d64c86ad038692800f5c343659ab0b935b0/sdks/python/apache_beam/internal/http_client.py#L32>

 >
 > On Wed, Aug 7, 2019 at 11:53 AM enrico canzonieri >
 > wrote:>
 >
 > > Hello,>
 > >>
 > > I noticed that the calls to the JobServer from the Python SDK do
 not have>
 > > timeouts. If I'm not mistaken that means that the call to
 pipeline.run()>
 > > could hang forever if the JobServer is not running (or failing to
 start).>
 > > E.g.>
 > >
 https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307>

 > > the call to Prepare() doesn't provide any timeout value and the
 same>
 > > applies to other JobServer requests.>
 > > I was considering adding a --job-server-request-timeout to the>
 > > PortableOptions>
 > > >
 > > class to be used in the JobServer interactions inside
 probable_runner.py.>
 > > Is there any specific reason why the timeout is not currently
 supported?>
 > > Does anybody have any objection adding the jobserver timeout? I
 could>
 > > volunteer to file a ticket and submit a pr for this.>
 > >>
 > > Cheers,>
 > > Enrico Canzonieri>
 > >>
 >




Re: Late data handling in Python SDK

2019-08-09 Thread Lukasz Cwik
+dev 

Related JIRA's I found are BEAM-3759 and BEAM-7825. This has been a
priority thing as the community has been trying to get streaming Python
execution working on multiple Beam runners.

On Wed, Aug 7, 2019 at 2:31 AM Sam Stephens  wrote:

> Hi all,
>
> I’ve been reading into, and experimenting with, the Python SDK recently. I
> see that late data handling is not supported currently, but I can’t find
> where the progress of this feature is tracked: either in the portability
> support matrix or in Beam’s Jira. Can anyone help me there?
>
> If the answer is that there isn’t a single place to see this, my
> question is then: what is the current status of this? Is there some
> underlying difficult problem that has to be solved first or is it just a
> priority thing?
>
> Thanks
> Sam
>


Re: Write-through-cache in State logic

2019-08-09 Thread Lukasz Cwik
On Fri, Aug 9, 2019 at 2:32 AM Robert Bradshaw  wrote:

> The question is whether the SDK needs to wait for the StateResponse to
> come back before declaring the bundle done. The proposal was to not
> send the cache token back as part of an append StateResponse [1], but
> pre-provide it as part of the bundle request.
>

Agree, the purpose of the I'm Blocked message is to occur during bundle
processing.


> Thinking about this some more, if we assume the state response was
> successfully applied, there's no reason for the SDK to block the
> bundle until it has its hands on the cache token--we can update the
> cache once the StateResponse comes back whether or not the bundle is
> still active. On the other hand, the runner needs a way to assert it
> has received and processed all StateRequests from the SDK associated
> with a bundle before it can declare the bundle complete (regardless of
> the cache tokens), so this might not be safe without some extra
> coordination (e.g. the ProcessBundleResponse indicating the number of
> state requests associated with a bundle).
>

Since the state request stream is ordered, we can add the id of the last
state request as part of the ProcessBundleResponse.


> [1]
> https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L627
>
> On Thu, Aug 8, 2019 at 6:57 PM Lukasz Cwik  wrote:
> >
> > The purpose of the new state API call in BEAM-7000 is to tell the runner
> that the SDK is now blocked waiting for the result of a specific state
> request and it should be used for fetches (not updates) and is there to
> allow for SDKs to differentiate readLater (I will need this data at some
> point in time in the future) from read (I need this data now). This comes
> up commonly where the user prefetches multiple state cells and then looks
> at their content allowing the runner to batch up those calls on its end.
> >
> > The way it can be used for clear+append is that the runner can store
> requests in memory up until some time/memory limit or until it gets its
> first "blocked" call and then issue all the requests together.
> >
> >
> > On Thu, Aug 8, 2019 at 9:42 AM Robert Bradshaw 
> wrote:
> >>
> >> On Tue, Aug 6, 2019 at 12:07 AM Thomas Weise  wrote:
> >> >
> >> > That would add a synchronization point that forces extra latency
> especially in streaming mode.
> >> >
> >> > Wouldn't it be possible for the runner to assign the token when
> starting the bundle and for the SDK to pass it along the state requests?
> That way, there would be no need to batch and wait for a flush.
> >>
> >> I think it makes sense to let the runner pre-assign these state update
> >> tokens rather than forcing a synchronization point.
> >>
> >> Here's some pointers for the Python implementation:
> >>
> >> Currently, when a DoFn needs UserState, a StateContext object is used
> >> that converts from a StateSpec to the actual value. When running
> >> portably, this is FnApiUserStateContext [1]. The state handles
> >> themselves are cached at [2] but this context only lives for the
> >> lifetime of a single bundle. Logic could be added here to use the
> >> token to share these across bundles.
> >>
> >> Each of these handles in turn invokes state_handler.get* methods when
> >> its read is called. (Here state_handler is a thin wrapper around the
> >> service itself) and constructs the appropriate result from the
> >> StateResponse. We would need to implement caching at this level as
> >> well, including the deserialization. This will probably require some
> >> restructoring of how _StateBackedIterable is implemented (or,
> >> possibly, making that class itself cache aware). Hopefully that's
> >> enough to get started.
> >>
> >> [1]
> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L402
> >> [2]
> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L436
> >> .
> >>
> >> > On Mon, Aug 5, 2019 at 2:49 PM Lukasz Cwik  wrote:
> >> >>
> >> >> I believe the intent is to add a new state API call telling the
> runner that it is blocked waiting for a response (BEAM-7000).
> >> >>
> >> >> This should allow the runner to wait till it sees one of these I'm
> blocked requests and then merge + batch any state calls it may have at that
> point in time allowing it to convert clear + appends into set calls and do
> any other optimizations as well. By default, the runner would have a time
> and space based limit on how many outstanding state calls there are before
> choosing to resolve them.
> >> >>
> >> >> On Mon, Aug 5, 2019 at 5:43 PM Lukasz Cwik  wrote:
> >> >>>
> >> >>> Now I see what you mean.
> >> >>>
> >> >>> On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise  wrote:
> >> 
> >>  Hi Luke,
> >> 
> >>  I guess the answer is that it depends on the state backend. If a
> set operation in the state backend is available that is more efficient than
> clear+app

Re: Allowing firewalled/offline builds of Beam

2019-08-09 Thread Robert Burke
If the work to switch to using Go Modules under gogradle works, then it
should be possible to use a proxy hosted inside the firewall for the go
packages, rather than the vendoring directories.

On Thu, Aug 8, 2019, 11:17 AM Lukasz Cwik  wrote:

> Udi beat me by a couple of mins.
>
> We build a good portion of the Beam Java codebase internally within Google
> by bypassing the gradle wrapper (gradlew) and executing the gradle command
> from a full gradle installation at the root of a copy of the Beam codebase.
>
> It does require your internal build system to use a version of gradle that
> is compatible with the version[1] that gradlew uses and you could create a
> wrapper that figures out which version of gradle to use and select the
> appropriate one from many local gradle installations. This should allow you
> to bypass the gradlew script entirely and any downloading it does.
>
> Note that gradle does support a --offline flag which we also use to ensure
> that it doesn't pull stuff from the internet. Not sure if all the plugins
> honor it but it works well enough for us to build most of the Beam Java
> codebase with it.
>
> 1:
> https://github.com/apache/beam/blob/497bc77c0d53098887156a014a659184097ef021/gradle/wrapper/gradle-wrapper.properties#L20
>
> On Thu, Aug 8, 2019 at 11:15 AM Udi Meiri  wrote:
>
>> You can download it here: https://gradle.org/releases/
>> and run it instead of using the wrapper.
>>
>> Example:
>> $ cd
>> $ unzip Downloads/gradle-5.5.1-bin.zip
>> $ cd ~/src/beam
>> $ ~/gradle-5.5.1/bin/gradle lint
>>
>>
>> On Thu, Aug 8, 2019 at 10:52 AM Chad Dombrova  wrote:
>>
>>> This topic came up in another thread, so I wanted to highlight a few
>>> things that we've discovered in our endeavors to build Beam behind a
>>> firewall.
>>>
>>> Conceptually, in order to allow this, a user needs to provide alternate
>>> mirrors for each "artifact" service required during build, and luckily I
>>> think most of the toolchains used by Beam support this. For example, the
>>> default PyPI mirror used by pip can be overridden via env var to an
>>> internal mirror, and likewise for docker and its registry service.  I'm
>>> currently looking into gogradle to see if we can provide an alternate
>>> vendor directory as a shared resource behind our firewall. (I have a bigger
>>> question here, which is why was it necessary to add a third language into
>>> the python Beam ecosystem, just for the bootstrap process?  Couldn't the
>>> boot code use python, or java?)
>>>
>>> But I'm getting ahead of myself.  We're actually stuck at the very
>>> beginning, with gradlew.  The gradlew wrapper seems to unconditionally
>>> download gradle, so you can't get past the first few hundred lines of code
>>> in the build process without requiring internet access.  I made a ticket
>>> here: https://issues.apache.org/jira/browse/BEAM-7931.  I'd love some
>>> pointers on how to fix this, because the offending code lives inside
>>> gradle-wrapper.jar, so I can't change it without access to the source.
>>>
>>> thanks,
>>> -chad
>>>
>>>


Re: Java 11 compatibility question

2019-08-09 Thread Elliotte Rusty Harold
Another useful thing to improve Java 11 support is to add
Automatic-Module-Name headers to the various JARs Beam publishes:

https://github.com/GoogleCloudPlatform/cloud-opensource-java/blob/master/library-best-practices/JLBP-20.md

If a JAR doesn't include this, Java synthezizes one from the name of the
jar file, and things get wonky fast. This is a low risk change that has no
effect on non-modular and pre-Java-9 apps.


On Wed, Aug 7, 2019 at 9:41 AM Michał Walenia 
wrote:

> Hi everyone,
>
> I want to gather the collective knowledge here about Java 11 compatibility
> and ask about the tests needed to deem Beam compatible with JDK 11.
>
> Right now concerning testing JDK 11 compatibility I implemented:
>
>- Jenkins jobs for running ValidatesRunner test sets in both Direct
>and Dataflow runners, [1], [2]
>- ValidatesRunner portability API tests for Dataflow [3],
>- examples in normal and portable mode for the Dataflow runner [4],
>[5].
>
>
> Are these tests sufficient to say that we’re java 11 compatible? What
> other aspects do we need to test to be able to say that?
>
>
> Regards,
>
>
> Michał
>
> [1]
> https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Direct/
> [2]
> https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Dataflow/
> [3]
> https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_PortabilityApi_Dataflow/
> [4]
> https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow/
> [5]
> https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow_Portability/
>
> --
>
> Michał Walenia
> Polidea  | Software Engineer
>
> M: +48 791 432 002 <+48791432002>
> E: michal.wale...@polidea.com
>
> Unique Tech
> Check out our projects! 
>


-- 
Elliotte Rusty Harold
elh...@ibiblio.org


Re: Java 11 compatibility question

2019-08-09 Thread Robert Bradshaw
On Fri, Aug 9, 2019 at 12:48 PM Michał Walenia 
wrote:

> From what I understand, the Java 8 -> 11 testing isn't in essence similar
> to py2 -> py3 checks.
>

True. Python 3 is in many ways a new language, and much less (and more
subtly) backwards compatible. You also can't "link" Python 3 code against
Python 2 code the way you can use old Java classes in new JVMs.


> In the case of Java, all we want to do is check if Beam downloaded by
> users from Maven (and compiled with JDK8) won't act up if used from a
> JDK/JRE 11 environment. We don't want to migrate the tool itself to a newer
> language version. As I mentioned in my previous email, there already are
> test suites checking compatibility - ValidatesRunner on Direct and Dataflow
> runners running in normal and portable mode.
> Those tests keep passing, so I believe we're mostly fine regarding
> compatibility.
> All I want to know is - is this enough?
> How else can we test Beam to be sure it works in JRE 11? After several
> accidental launches of build tasks in JDK 11, I am sure that it's not
> buildable with it, but this is not the compatibility type we want to check.
>

Well, we will want this eventually. Before that, we'll want to be sure
users can build their Java 11 code against our artifacts.


>
> Thank you for your replies,
> Michal
>
>
> On Thu, Aug 8, 2019 at 10:25 PM Valentyn Tymofieiev 
> wrote:
>
>> From Python 3 migration standpoint, some high level pillars that increase
>> our confidence are:
>> - Test coverage: (PreCommit, PostCommit), creating a system to make it
>> easy for add test coverage in new language for new functionality.
>> - Support of new language version by core runners + ValidatesRunner test
>> coverage.
>> - Test of time: offer new functionality in a few releases, monitor &
>> address user feedback.
>>
>> Dependency audit and critical feature support in new language, as
>> mentioned by others, are important  points. If you are curious about
>> detailed AIs that went into Python 3 support, feel free to look into
>> BEAM-1251 or Py3 Kanban Board (
>> https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=245&view=detail
>> ).
>>
>> Thanks,
>> Valentyn
>>
>>
>> On Thu, Aug 8, 2019 at 7:24 PM Mark Liu  wrote:
>>
>>> Some actions we did for py2 to py3 works:
>>> - Check and resolve incompatible dependencies.
>>> - Enable py3 lint.
>>> - Fill feature gaps between py2 and py3 (e.g. new py3 container, new
>>> solution for type hint)
>>> - Add unit tests, integration tests and other tests on py3 for coverage.
>>> - Release (p3) and deprecation (p2) plan.
>>>
>>> Hope this helps on Java upgrade.
>>>
>>> Mark
>>>
>>> On Wed, Aug 7, 2019 at 3:19 PM Ahmet Altay  wrote:
>>>


 On Wed, Aug 7, 2019 at 12:21 PM Elliotte Rusty Harold <
 elh...@ibiblio.org> wrote:

> gRPC bug here: https://github.com/grpc/grpc-java/issues/3522
>
> google-cloud-java bug:
> https://github.com/googleapis/google-cloud-java/issues/5760
>
> Neither has a cheap or easy fix, I'm afraid. Commenting on these
> issues might help us prove that there's a demand to priorotize these
> compared to other work. If anyone has a support contract and could
> file a ticket asking for a fix, that would help even more.
>
> Those are the two I know about. There might be others elsewhere in the
> dependency tree.
>
>
> On Wed, Aug 7, 2019 at 2:25 PM Lukasz Cwik  wrote:
> >
> > Since java8 -> java11 is similar to python2 -> python3 migration,
> what was the acceptance criteria there?
>

 I do not remember formally discussing this. The bar used was, all
 existing tests will pass for python2 and python3. New tests will be added
 for python3 specific features. (To avoid any confusion this bar has not
 been cleared yet.)

 cc: +Valentyn Tymofieiev  could add more details.


> >
> > On Wed, Aug 7, 2019 at 1:54 PM Elliotte Rusty Harold <
> elh...@ibiblio.org> wrote:
> >>
> >>
> >>
> >> On Wed, Aug 7, 2019 at 9:41 AM Michał Walenia <
> michal.wale...@polidea.com> wrote:
> >>>
> >>>
> >>> Are these tests sufficient to say that we’re java 11 compatible?
> What other aspects do we need to test to be able to say that?
> >>>
> >>>
> >>
> >> Are any packages split across multiple jar files, including
> packages beam dependns on? That's the one that's bitten some other
> projects, including google-cloud-java and gRPC. If so, beam is not going 
> to
> work with the module system.
> >>
> >> Work is ongoing to fix splitn packages in both gRPC and
> google-cloud-java, but we're not very far down that path and I think it's
> going to be an API breaking change.
> >>
> > Romain pointed this out earlier and I fixed the last case of
> packages being split across multiple jars within Apache Beam but as you
> point out our transitive dependencies are not re

Re: Java 11 compatibility question

2019-08-09 Thread Michał Walenia
>From what I understand, the Java 8 -> 11 testing isn't in essence similar
to py2 -> py3 checks.
In the case of Java, all we want to do is check if Beam downloaded by users
from Maven (and compiled with JDK8) won't act up if used from a JDK/JRE 11
environment. We don't want to migrate the tool itself to a newer language
version. As I mentioned in my previous email, there already are test suites
checking compatibility - ValidatesRunner on Direct and Dataflow runners
running in normal and portable mode.
Those tests keep passing, so I believe we're mostly fine regarding
compatibility.
All I want to know is - is this enough?
How else can we test Beam to be sure it works in JRE 11? After several
accidental launches of build tasks in JDK 11, I am sure that it's not
buildable with it, but this is not the compatibility type we want to check.

Thank you for your replies,
Michal


On Thu, Aug 8, 2019 at 10:25 PM Valentyn Tymofieiev 
wrote:

> From Python 3 migration standpoint, some high level pillars that increase
> our confidence are:
> - Test coverage: (PreCommit, PostCommit), creating a system to make it
> easy for add test coverage in new language for new functionality.
> - Support of new language version by core runners + ValidatesRunner test
> coverage.
> - Test of time: offer new functionality in a few releases, monitor &
> address user feedback.
>
> Dependency audit and critical feature support in new language, as
> mentioned by others, are important  points. If you are curious about
> detailed AIs that went into Python 3 support, feel free to look into
> BEAM-1251 or Py3 Kanban Board (
> https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=245&view=detail
> ).
>
> Thanks,
> Valentyn
>
>
> On Thu, Aug 8, 2019 at 7:24 PM Mark Liu  wrote:
>
>> Some actions we did for py2 to py3 works:
>> - Check and resolve incompatible dependencies.
>> - Enable py3 lint.
>> - Fill feature gaps between py2 and py3 (e.g. new py3 container, new
>> solution for type hint)
>> - Add unit tests, integration tests and other tests on py3 for coverage.
>> - Release (p3) and deprecation (p2) plan.
>>
>> Hope this helps on Java upgrade.
>>
>> Mark
>>
>> On Wed, Aug 7, 2019 at 3:19 PM Ahmet Altay  wrote:
>>
>>>
>>>
>>> On Wed, Aug 7, 2019 at 12:21 PM Elliotte Rusty Harold <
>>> elh...@ibiblio.org> wrote:
>>>
 gRPC bug here: https://github.com/grpc/grpc-java/issues/3522

 google-cloud-java bug:
 https://github.com/googleapis/google-cloud-java/issues/5760

 Neither has a cheap or easy fix, I'm afraid. Commenting on these
 issues might help us prove that there's a demand to priorotize these
 compared to other work. If anyone has a support contract and could
 file a ticket asking for a fix, that would help even more.

 Those are the two I know about. There might be others elsewhere in the
 dependency tree.


 On Wed, Aug 7, 2019 at 2:25 PM Lukasz Cwik  wrote:
 >
 > Since java8 -> java11 is similar to python2 -> python3 migration,
 what was the acceptance criteria there?

>>>
>>> I do not remember formally discussing this. The bar used was, all
>>> existing tests will pass for python2 and python3. New tests will be added
>>> for python3 specific features. (To avoid any confusion this bar has not
>>> been cleared yet.)
>>>
>>> cc: +Valentyn Tymofieiev  could add more details.
>>>
>>>
 >
 > On Wed, Aug 7, 2019 at 1:54 PM Elliotte Rusty Harold <
 elh...@ibiblio.org> wrote:
 >>
 >>
 >>
 >> On Wed, Aug 7, 2019 at 9:41 AM Michał Walenia <
 michal.wale...@polidea.com> wrote:
 >>>
 >>>
 >>> Are these tests sufficient to say that we’re java 11 compatible?
 What other aspects do we need to test to be able to say that?
 >>>
 >>>
 >>
 >> Are any packages split across multiple jar files, including packages
 beam dependns on? That's the one that's bitten some other projects,
 including google-cloud-java and gRPC. If so, beam is not going to work with
 the module system.
 >>
 >> Work is ongoing to fix splitn packages in both gRPC and
 google-cloud-java, but we're not very far down that path and I think it's
 going to be an API breaking change.
 >>
 > Romain pointed this out earlier and I fixed the last case of packages
 being split across multiple jars within Apache Beam but as you point out
 our transitive dependencies are not ready.
 >>
 >>
 >> --
 >> Elliotte Rusty Harold
 >> elh...@ibiblio.org



 --
 Elliotte Rusty Harold
 elh...@ibiblio.org

>>>

-- 

Michał Walenia
Polidea  | Software Engineer

M: +48 791 432 002 <+48791432002>
E: michal.wale...@polidea.com

Unique Tech
Check out our projects! 


Re: Write-through-cache in State logic

2019-08-09 Thread Robert Bradshaw
The question is whether the SDK needs to wait for the StateResponse to
come back before declaring the bundle done. The proposal was to not
send the cache token back as part of an append StateResponse [1], but
pre-provide it as part of the bundle request.

Thinking about this some more, if we assume the state response was
successfully applied, there's no reason for the SDK to block the
bundle until it has its hands on the cache token--we can update the
cache once the StateResponse comes back whether or not the bundle is
still active. On the other hand, the runner needs a way to assert it
has received and processed all StateRequests from the SDK associated
with a bundle before it can declare the bundle complete (regardless of
the cache tokens), so this might not be safe without some extra
coordination (e.g. the ProcessBundleResponse indicating the number of
state requests associated with a bundle).

[1] 
https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L627

On Thu, Aug 8, 2019 at 6:57 PM Lukasz Cwik  wrote:
>
> The purpose of the new state API call in BEAM-7000 is to tell the runner that 
> the SDK is now blocked waiting for the result of a specific state request and 
> it should be used for fetches (not updates) and is there to allow for SDKs to 
> differentiate readLater (I will need this data at some point in time in the 
> future) from read (I need this data now). This comes up commonly where the 
> user prefetches multiple state cells and then looks at their content allowing 
> the runner to batch up those calls on its end.
>
> The way it can be used for clear+append is that the runner can store requests 
> in memory up until some time/memory limit or until it gets its first 
> "blocked" call and then issue all the requests together.
>
>
> On Thu, Aug 8, 2019 at 9:42 AM Robert Bradshaw  wrote:
>>
>> On Tue, Aug 6, 2019 at 12:07 AM Thomas Weise  wrote:
>> >
>> > That would add a synchronization point that forces extra latency 
>> > especially in streaming mode.
>> >
>> > Wouldn't it be possible for the runner to assign the token when starting 
>> > the bundle and for the SDK to pass it along the state requests? That way, 
>> > there would be no need to batch and wait for a flush.
>>
>> I think it makes sense to let the runner pre-assign these state update
>> tokens rather than forcing a synchronization point.
>>
>> Here's some pointers for the Python implementation:
>>
>> Currently, when a DoFn needs UserState, a StateContext object is used
>> that converts from a StateSpec to the actual value. When running
>> portably, this is FnApiUserStateContext [1]. The state handles
>> themselves are cached at [2] but this context only lives for the
>> lifetime of a single bundle. Logic could be added here to use the
>> token to share these across bundles.
>>
>> Each of these handles in turn invokes state_handler.get* methods when
>> its read is called. (Here state_handler is a thin wrapper around the
>> service itself) and constructs the appropriate result from the
>> StateResponse. We would need to implement caching at this level as
>> well, including the deserialization. This will probably require some
>> restructoring of how _StateBackedIterable is implemented (or,
>> possibly, making that class itself cache aware). Hopefully that's
>> enough to get started.
>>
>> [1] 
>> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L402
>> [2] 
>> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L436
>> .
>>
>> > On Mon, Aug 5, 2019 at 2:49 PM Lukasz Cwik  wrote:
>> >>
>> >> I believe the intent is to add a new state API call telling the runner 
>> >> that it is blocked waiting for a response (BEAM-7000).
>> >>
>> >> This should allow the runner to wait till it sees one of these I'm 
>> >> blocked requests and then merge + batch any state calls it may have at 
>> >> that point in time allowing it to convert clear + appends into set calls 
>> >> and do any other optimizations as well. By default, the runner would have 
>> >> a time and space based limit on how many outstanding state calls there 
>> >> are before choosing to resolve them.
>> >>
>> >> On Mon, Aug 5, 2019 at 5:43 PM Lukasz Cwik  wrote:
>> >>>
>> >>> Now I see what you mean.
>> >>>
>> >>> On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise  wrote:
>> 
>>  Hi Luke,
>> 
>>  I guess the answer is that it depends on the state backend. If a set 
>>  operation in the state backend is available that is more efficient than 
>>  clear+append, then it would be beneficial to have a dedicated fn api 
>>  operation to allow for such optimization. That's something that needs 
>>  to be determined with a profiler :)
>> 
>>  But the low hanging fruit is cross-bundle caching.
>> 
>>  Thomas
>> 
>>  On Mon, Aug 5, 2019 at 2:06 PM Lukasz Cwik  wrote:
>> >

Re: Inconsistent Results with GroupIntoBatches PTransform

2019-08-09 Thread Jan Lukavský

Hi Rahul,

I cannot tell for sure. The fix was applied at runners-core, so - 
technically - it was possible that multiple runners were affected. A 
runner would be affected, if and only if, it would use something that 
depends on hashCode() of StateTag (or StateSpec) and user would use a 
Coder for that state that doesn't correctly implement hashCode() and 
equals() - SchemaCoder is one of such example.


After a few greps on the repository, I think that it might be possible, 
that Dataflow runner would be (more or less) affected by this as well 
(but someone from Dataflow team might confirm or disprove that better 
than me). Possibly affected code is at WindmillStateReader.java, which 
uses ConcurrentHashMap with StateTag as key. I'm not able to tell the 
consequences of that. I didn't find any obvious uses of HashMap or 
HashSet of StateTags in other runners. But that doesn't mean, that there 
really isn't any. :-)


Either way, by using version 2.14.0 you should be safe on all runners.

Jan

On 8/9/19 10:59 AM, rahul patwari wrote:

Hi Jan,

I was using Beam 2.13.0. I have upgraded Beam version to 2.14.0 and 
the results are always correct. No more inconsistencies.


Does BEAM-7269 affect all the runners?

Thanks,
Rahul

On Fri, Aug 9, 2019 at 2:15 PM Jan Lukavský > wrote:


Hi Rahul,

what version of Beam are you using? There was a bug [1], which was
fixed in 2.14.0. This bug could cause what you observe.

Jan

[1] https://issues.apache.org/jira/browse/BEAM-7269

On 8/9/19 10:35 AM, rahul patwari wrote:

Hi Robert,

When PCollection is created using
"Create.of(listOfRow)*.withCoder(RowCoder.of(schema))*", I am
getting "Inconsistent" results.
By "Inconsistent", I mean that the result is "Incorrect"
sometimes(most of the times).
By "Incorrect" result, I mean that the elements are missing. The
elements are not duplicated. The elements are not batched
differently.

I have used System.identityHashcode(this) to convert
PCollection to PCollection> to apply
Stateful Pardo(GroupIntoBatches) as per your suggestion in this
thread



To verify the result, I have used GroupByKey, which should give
the same result as GroupIntoBatches *for my case*.

However, When PCollection is created using
"Create.of(listOfRow)", the results are always correct.

Regards,
Rahul

On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw
mailto:rober...@google.com>> wrote:

Could you clarify what you mean by "inconsistent" and
"incorrect"? Are
elements missing/duplicated, or just batched differently?

On Fri, Aug 9, 2019 at 2:18 AM rahul patwari
mailto:rahulpatwari8...@gmail.com>> wrote:
>
> I only ran in Direct runner. I will run in other runners
and let you know the results.
> I am not setting "streaming" when executing.
>
> On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, mailto:lc...@google.com>> wrote:
>>
>> Have you tried running this on more than one runner (e.g.
Dataflow, Flink, Direct)?
>>
>> Are you setting --streaming when executing?
>>
>> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari
mailto:rahulpatwari8...@gmail.com>> wrote:
>>>
>>> Hi,
>>>
>>> I am getting inconsistent results when using
GroupIntoBatches PTransform.
>>> I am using Create.of() PTransform to create a PCollection
from in-memory. When a coder is given with Create.of()
PTransform, I am facing the issue.
>>> If the coder is not provided, the results are consistent
and correct(Maybe this is just a coincidence and the problem
is at some other place).
>>> If Batch Size is 1, results are always consistent.
>>>
>>> Not sure if this is an issue with
Serialization/Deserialization (or) GroupIntoBatches (or)
Create.of() PTransform.
>>>
>>> The Java code, expected correct results, and inconsistent
results are available at
https://github.com/rahul8383/beam-examples
>>>
>>> Thanks,
>>> Rahul



Re: Inconsistent Results with GroupIntoBatches PTransform

2019-08-09 Thread rahul patwari
Hi Jan,

I was using Beam 2.13.0. I have upgraded Beam version to 2.14.0 and the
results are always correct. No more inconsistencies.

Does BEAM-7269 affect all the runners?

Thanks,
Rahul

On Fri, Aug 9, 2019 at 2:15 PM Jan Lukavský  wrote:

> Hi Rahul,
>
> what version of Beam are you using? There was a bug [1], which was fixed
> in 2.14.0. This bug could cause what you observe.
>
> Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-7269
> On 8/9/19 10:35 AM, rahul patwari wrote:
>
> Hi Robert,
>
> When PCollection is created using "Create.of(listOfRow)
> *.withCoder(RowCoder.of(schema))*", I am getting "Inconsistent" results.
> By "Inconsistent", I mean that the result is "Incorrect" sometimes(most of
> the times).
> By "Incorrect" result, I mean that the elements are missing. The elements
> are not duplicated. The elements are not batched differently.
>
> I have used System.identityHashcode(this) to convert PCollection to
> PCollection> to apply Stateful Pardo(GroupIntoBatches) as
> per your suggestion in this thread
> 
>
> To verify the result, I have used GroupByKey, which should give the
> same result as GroupIntoBatches *for my case*.
>
> However, When PCollection is created using "Create.of(listOfRow)", the
> results are always correct.
>
> Regards,
> Rahul
>
> On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw 
> wrote:
>
>> Could you clarify what you mean by "inconsistent" and "incorrect"? Are
>> elements missing/duplicated, or just batched differently?
>>
>> On Fri, Aug 9, 2019 at 2:18 AM rahul patwari 
>> wrote:
>> >
>> > I only ran in Direct runner. I will run in other runners and let you
>> know the results.
>> > I am not setting "streaming" when executing.
>> >
>> > On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik,  wrote:
>> >>
>> >> Have you tried running this on more than one runner (e.g. Dataflow,
>> Flink, Direct)?
>> >>
>> >> Are you setting --streaming when executing?
>> >>
>> >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari <
>> rahulpatwari8...@gmail.com> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I am getting inconsistent results when using GroupIntoBatches
>> PTransform.
>> >>> I am using Create.of() PTransform to create a PCollection from
>> in-memory. When a coder is given with Create.of() PTransform, I am facing
>> the issue.
>> >>> If the coder is not provided, the results are consistent and
>> correct(Maybe this is just a coincidence and the problem is at some other
>> place).
>> >>> If Batch Size is 1, results are always consistent.
>> >>>
>> >>> Not sure if this is an issue with Serialization/Deserialization (or)
>> GroupIntoBatches (or) Create.of() PTransform.
>> >>>
>> >>> The Java code, expected correct results, and inconsistent results are
>> available at https://github.com/rahul8383/beam-examples
>> >>>
>> >>> Thanks,
>> >>> Rahul
>>
>


Re: Inconsistent Results with GroupIntoBatches PTransform

2019-08-09 Thread Jan Lukavský

Hi Rahul,

what version of Beam are you using? There was a bug [1], which was fixed 
in 2.14.0. This bug could cause what you observe.


Jan

[1] https://issues.apache.org/jira/browse/BEAM-7269

On 8/9/19 10:35 AM, rahul patwari wrote:

Hi Robert,

When PCollection is created using 
"Create.of(listOfRow)*.withCoder(RowCoder.of(schema))*", I am getting 
"Inconsistent" results.
By "Inconsistent", I mean that the result is "Incorrect" 
sometimes(most of the times).
By "Incorrect" result, I mean that the elements are missing. The 
elements are not duplicated. The elements are not batched differently.


I have used System.identityHashcode(this) to convert PCollection 
to PCollection> to apply Stateful 
Pardo(GroupIntoBatches) as per your suggestion in this thread 
 

To verify the result, I have used GroupByKey, which should give the 
same result as GroupIntoBatches *for my case*.


However, When PCollection is created using "Create.of(listOfRow)", the 
results are always correct.


Regards,
Rahul

On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw > wrote:


Could you clarify what you mean by "inconsistent" and "incorrect"? Are
elements missing/duplicated, or just batched differently?

On Fri, Aug 9, 2019 at 2:18 AM rahul patwari
mailto:rahulpatwari8...@gmail.com>>
wrote:
>
> I only ran in Direct runner. I will run in other runners and let
you know the results.
> I am not setting "streaming" when executing.
>
> On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, mailto:lc...@google.com>> wrote:
>>
>> Have you tried running this on more than one runner (e.g.
Dataflow, Flink, Direct)?
>>
>> Are you setting --streaming when executing?
>>
>> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari
mailto:rahulpatwari8...@gmail.com>>
wrote:
>>>
>>> Hi,
>>>
>>> I am getting inconsistent results when using GroupIntoBatches
PTransform.
>>> I am using Create.of() PTransform to create a PCollection from
in-memory. When a coder is given with Create.of() PTransform, I am
facing the issue.
>>> If the coder is not provided, the results are consistent and
correct(Maybe this is just a coincidence and the problem is at
some other place).
>>> If Batch Size is 1, results are always consistent.
>>>
>>> Not sure if this is an issue with
Serialization/Deserialization (or) GroupIntoBatches (or)
Create.of() PTransform.
>>>
>>> The Java code, expected correct results, and inconsistent
results are available at https://github.com/rahul8383/beam-examples
>>>
>>> Thanks,
>>> Rahul



Re: Inconsistent Results with GroupIntoBatches PTransform

2019-08-09 Thread rahul patwari
Hi Robert,

When PCollection is created using "Create.of(listOfRow)
*.withCoder(RowCoder.of(schema))*", I am getting "Inconsistent" results.
By "Inconsistent", I mean that the result is "Incorrect" sometimes(most of
the times).
By "Incorrect" result, I mean that the elements are missing. The elements
are not duplicated. The elements are not batched differently.

I have used System.identityHashcode(this) to convert PCollection to
PCollection> to apply Stateful Pardo(GroupIntoBatches) as
per your suggestion in this thread


To verify the result, I have used GroupByKey, which should give the
same result as GroupIntoBatches *for my case*.

However, When PCollection is created using "Create.of(listOfRow)", the
results are always correct.

Regards,
Rahul

On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw  wrote:

> Could you clarify what you mean by "inconsistent" and "incorrect"? Are
> elements missing/duplicated, or just batched differently?
>
> On Fri, Aug 9, 2019 at 2:18 AM rahul patwari 
> wrote:
> >
> > I only ran in Direct runner. I will run in other runners and let you
> know the results.
> > I am not setting "streaming" when executing.
> >
> > On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik,  wrote:
> >>
> >> Have you tried running this on more than one runner (e.g. Dataflow,
> Flink, Direct)?
> >>
> >> Are you setting --streaming when executing?
> >>
> >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari <
> rahulpatwari8...@gmail.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I am getting inconsistent results when using GroupIntoBatches
> PTransform.
> >>> I am using Create.of() PTransform to create a PCollection from
> in-memory. When a coder is given with Create.of() PTransform, I am facing
> the issue.
> >>> If the coder is not provided, the results are consistent and
> correct(Maybe this is just a coincidence and the problem is at some other
> place).
> >>> If Batch Size is 1, results are always consistent.
> >>>
> >>> Not sure if this is an issue with Serialization/Deserialization (or)
> GroupIntoBatches (or) Create.of() PTransform.
> >>>
> >>> The Java code, expected correct results, and inconsistent results are
> available at https://github.com/rahul8383/beam-examples
> >>>
> >>> Thanks,
> >>> Rahul
>


Re: (mini-doc) Beam (Flink) portable job templates

2019-08-09 Thread Robert Bradshaw
The expansion service is a separate service. (The flink jar happens to
bring both up.) However, there is negotiation to receive/validate the
pipeline options.

On Fri, Aug 9, 2019 at 1:54 AM Thomas Weise  wrote:
>
> We would also need to consider cross-language pipelines that (currently) 
> assume the interaction with an expansion service at construction time.
>
> On Thu, Aug 8, 2019, 4:38 PM Kyle Weaver  wrote:
>>
>> > It might also be useful to have the option to just output the proto and 
>> > artifacts, as alternative to the jar file.
>>
>> Sure, that wouldn't be too big a change if we were to decide to go the SDK 
>> route.
>>
>> > For the Flink entry point we would need to allow for the job server to be 
>> > used as a library.
>>
>> We don't need the whole job server, we only need to add a main method to 
>> FlinkPipelineRunner [1] as the entry point, which would basically just do 
>> the setup described in the doc then call FlinkPipelineRunner::run.
>>
>> [1] 
>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java#L53
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>
>>
>> On Thu, Aug 8, 2019 at 4:21 PM Thomas Weise  wrote:
>>>
>>> Hi Kyle,
>>>
>>> It might also be useful to have the option to just output the proto and 
>>> artifacts, as alternative to the jar file.
>>>
>>> For the Flink entry point we would need to allow for the job server to be 
>>> used as a library. It would probably not be too hard to have the Flink job 
>>> constructed via the context execution environment, which would require no 
>>> changes on the Flink side.
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>> On Thu, Aug 8, 2019 at 9:52 AM Kyle Weaver  wrote:

 Re Javaless/serverless solution:
 I take it this would probably mean that we would construct the jar 
 directly from the SDK. There are advantages to this: full separation of 
 Python and Java environments, no need for a job server, and likely a 
 simpler implementation, since we'd no longer have to work within the 
 constraints of the existing job server infrastructure. The only downside I 
 can think of is the additional cost of implementing/maintaining jar 
 creation code in each SDK, but that cost may be acceptable if it's simple 
 enough.

 Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com


 On Thu, Aug 8, 2019 at 9:31 AM Thomas Weise  wrote:
>
>
>
> On Thu, Aug 8, 2019 at 8:29 AM Robert Bradshaw  
> wrote:
>>
>> > Before assembling the jar, the job server runs to create the 
>> > ingredients. That requires the (matching) Java environment on the 
>> > Python developers machine.
>>
>> We can run the job server and have it create the jar (and if we keep
>> the job server running we can use it to interact with the running
>> job). However, if the jar layout is simple enough, there's no need to
>> even build it from Java.
>>
>> Taken to the extreme, this is a one-shot, jar-based JobService API. We
>> choose a standard layout of where to put the pipeline description and
>> artifacts, and can "augment" an existing jar (that has a
>> runner-specific main class whose entry point knows how to read this
>> data to kick off a pipeline as if it were a users driver code) into
>> one that has a portable pipeline packaged into it for submission to a
>> cluster.
>
>
> It would be nice if the Python developer doesn't have to run anything 
> Java at all.
>
> As we just discussed offline, this could be accomplished by  including 
> the proto that is produced by the SDK into the pre-existing jar.
>
> And if the jar has an entry point that creates the Flink job in the 
> prescribed manner [1], it can be directly submitted to the Flink REST 
> API. That would allow for Java free client.
>
> [1] 
> https://lists.apache.org/thread.html/6db869c53816f4e2917949a7c6992c2b90856d7d639d7f2e1cd13768@%3Cdev.flink.apache.org%3E
>


Re: Beam Python Portable Runner - Adding timeout to JobServer grpc calls

2019-08-09 Thread Robert Bradshaw
If we do provide a configuration value for this, I would make it have a
fairly large default and ure-use the flag for all RPCs of similar nature,
not tweeks for this particular service only.

On Fri, Aug 9, 2019 at 2:58 AM Ahmet Altay  wrote:

> Default plus a flag to override sounds reasonable. Although from Dataflow
> experience I do not remember timeouts causing issues and each new added
> flag adds complexity. What do others think?
>
> On Thu, Aug 8, 2019 at 11:38 AM Kyle Weaver  wrote:
>
>> If we do make a default, I still think it should be configurable via a
>> flag. I can't think of why the prepare, stage artifact, job state, or job
>> message requests might take more than 60 seconds, but you never know,
>> particularly with artifact staging, which might be uploading artifacts to
>> distributed storage.
>>
>> I assume the run request itself would not be subject to timeouts, as
>> running the pipeline can be assumed to take significantly longer than the
>> setup work.
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>
>>
>> On Thu, Aug 8, 2019 at 11:20 AM Enrico Canzonieri 
>> wrote:
>>
>>> Default timeout with no flag may work as well.
>>> The main consideration here is whether some api calls may take longer
>>> than 60 seconds because of the complexity of the users' Beam pipeline. E.g.
>>> Could job_service.Prepare() take longer than 60 seconds if the given Beam
>>> pipeline is extremely complex?
>>>
>>> Basically if there are cases when the user code may cause the call
>>> duration to increase to the point the timeout prevents submitting the app
>>> itself then we should consider having a flag.
>>>
>>> On 2019/08/07 20:13:12, Ahmet Altay wrote:
>>> > Could we pick a default timeout value instead of introducing a flag?
>>> We use>
>>> > 60 seconds as the default timeout for http client [1], we can do the
>>> same>
>>> > here.>
>>> >
>>> > [1]>
>>> >
>>> https://github.com/apache/beam/blob/3a182d64c86ad038692800f5c343659ab0b935b0/sdks/python/apache_beam/internal/http_client.py#L32>
>>>
>>> >
>>> > On Wed, Aug 7, 2019 at 11:53 AM enrico canzonieri >
>>> > wrote:>
>>> >
>>> > > Hello,>
>>> > >>
>>> > > I noticed that the calls to the JobServer from the Python SDK do not
>>> have>
>>> > > timeouts. If I'm not mistaken that means that the call to
>>> pipeline.run()>
>>> > > could hang forever if the JobServer is not running (or failing to
>>> start).>
>>> > > E.g.>
>>> > >
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307>
>>>
>>> > > the call to Prepare() doesn't provide any timeout value and the
>>> same>
>>> > > applies to other JobServer requests.>
>>> > > I was considering adding a --job-server-request-timeout to the>
>>> > > PortableOptions>
>>> > > >
>>> > > class to be used in the JobServer interactions inside
>>> probable_runner.py.>
>>> > > Is there any specific reason why the timeout is not currently
>>> supported?>
>>> > > Does anybody have any objection adding the jobserver timeout? I
>>> could>
>>> > > volunteer to file a ticket and submit a pr for this.>
>>> > >>
>>> > > Cheers,>
>>> > > Enrico Canzonieri>
>>> > >>
>>> >
>>>
>>>


Re: Inconsistent Results with GroupIntoBatches PTransform

2019-08-09 Thread Robert Bradshaw
Could you clarify what you mean by "inconsistent" and "incorrect"? Are
elements missing/duplicated, or just batched differently?

On Fri, Aug 9, 2019 at 2:18 AM rahul patwari  wrote:
>
> I only ran in Direct runner. I will run in other runners and let you know the 
> results.
> I am not setting "streaming" when executing.
>
> On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik,  wrote:
>>
>> Have you tried running this on more than one runner (e.g. Dataflow, Flink, 
>> Direct)?
>>
>> Are you setting --streaming when executing?
>>
>> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari  
>> wrote:
>>>
>>> Hi,
>>>
>>> I am getting inconsistent results when using GroupIntoBatches PTransform.
>>> I am using Create.of() PTransform to create a PCollection from in-memory. 
>>> When a coder is given with Create.of() PTransform, I am facing the issue.
>>> If the coder is not provided, the results are consistent and correct(Maybe 
>>> this is just a coincidence and the problem is at some other place).
>>> If Batch Size is 1, results are always consistent.
>>>
>>> Not sure if this is an issue with Serialization/Deserialization (or) 
>>> GroupIntoBatches (or) Create.of() PTransform.
>>>
>>> The Java code, expected correct results, and inconsistent results are 
>>> available at https://github.com/rahul8383/beam-examples
>>>
>>> Thanks,
>>> Rahul