Re: [Proposal] Add exception handling option to MapElements

2018-10-02 Thread Jean-Baptiste Onofré
It sounds interesting.

Thanks !
Regards
JB

On 03/10/2018 01:49, Jeff Klukas wrote:
> I've seen a few Beam users mention the need to handle errors in their
> transforms by using a try/catch and routing to different outputs based
> on whether an exception was thrown. This was particularly nicely written
> up in a post by Vallery Lancey:
> 
> https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a
> 
> I'd love to see this pattern better supported directly in the Beam API,
> because it currently requires the user to implement a full DoFn even for
> the simplest cases.
> 
> I propose we support for a MapElements-like transform that allows the
> user to specify a set of exceptions to catch and route to a failure
> output. Something like:
> 
> MapElements
> .via(myFunctionThatThrows)
> .withSuccessTag(successTag)
> .withFailureTag(failureTag, JsonParsingException.class)
> 
> which would output a PCollectionTuple with both the successful outcomes
> of the map operation and also a collection of the inputs that threw
> JsonParsingException.
> 
> To make this more concrete, I put together a proof of concept PR:
> https://github.com/apache/beam/pull/6518  I'd appreciate feedback about
> whether this seems like a worthwhile addition and a feasible approach.

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: SplittableDoFn

2018-10-02 Thread Jean-Baptiste Onofré
Nice one Alex !

Thanks
Regards
JB

On 02/10/2018 23:19, Alex Van Boxel wrote:
> Don't want to crash the tech discussion here, but... I just gave a
> session at the Beam Summit about Splittable DoFn's as a users
> perspective (from things I could gather from the documentation and
> experimentation). Her is the slides deck, maybe it could be
> useful: 
> https://docs.google.com/presentation/d/1dSc6oKh5pZItQPB_QiUyEoLT2TebMnj-pmdGipkVFPk/edit?usp=sharing
>  (quite
> proud of the animations though ;-)
> 
>  _/
> _/ Alex Van Boxel
> 
> 
> On Thu, Sep 27, 2018 at 12:04 AM Lukasz Cwik  > wrote:
> 
> Reuven, just inside the restriction tracker itself which is scoped
> per executing SplittableDoFn. A user could incorrectly write the
> synchronization since they are currently responsible for writing it
> though.
> 
> On Wed, Sep 26, 2018 at 2:51 PM Reuven Lax  > wrote:
> 
> is synchronization over an entire work item, or just inside
> restriction tracker? my concern is that some runners (especially
> streaming runners) might have hundreds or thousands of parallel
> work items being processed for the same SDF (for different
> keys), and I'm afraid of creating lock-contention bottlenecks.
> 
> On Fri, Sep 21, 2018 at 3:42 PM Lukasz Cwik  > wrote:
> 
> The synchronization is related to Java thread safety since
> there is likely to be concurrent access needed to a
> restriction tracker to properly handle accessing the backlog
> and splitting concurrently from when the users DoFn is
> executing and updating the restriction tracker. This is
> similar to the Java thread safety needed in BoundedSource
> and UnboundedSource for fraction consumed, backlog bytes,
> and splitting.
> 
> On Fri, Sep 21, 2018 at 2:38 PM Reuven Lax  > wrote:
> 
> Can you give details on what the synchronization is per?
> Is it per key, or global to each worker?
> 
> On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik
> mailto:lc...@google.com>> wrote:
> 
> As I was looking at the SplittableDoFn API while
> working towards making a proposal for how the
> backlog/splitting API could look, I found some sharp
> edges that could be improved.
> 
> I noticed that:
> 1) We require users to write thread safe code, this
> is something that we haven't asked of users when
> writing a DoFn.
> 2) We "internal" methods within the
> RestrictionTracker that are not meant to be used by
> the runner.
> 
> I can fix these issues by giving the user a
> forwarding restriction tracker[1] that provides an
> appropriate level of synchronization as needed and
> also provides the necessary observation hooks to see
> when a claim failed or succeeded.
> 
> This requires a change to our experimental API since
> we need to pass a RestrictionTracker to the
> @ProcessElement method instead of a sub-type of
> RestrictionTracker.
> @ProcessElement
> processElement(ProcessContext context,
> OffsetRangeTracker tracker) { ... }
> becomes:
> @ProcessElement
> processElement(ProcessContext context,
> RestrictionTracker tracker) { ... }
> 
> This provides an additional benefit that it prevents
> users from working around the RestrictionTracker
> APIs and potentially making underlying changes to
> the tracker outside of the tryClaim call.
> 
> Full implementation is available within this PR[2]
> and was wondering what people thought.
> 
> 1: 
> https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72
> 2: https://github.com/apache/beam/pull/6467
> 
> 
> On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik
> mailto:lc...@google.com>> wrote:
> 
> The changes to the API have not been proposed
> yet. So far it has all been about what is the
> representation and why.
> 
> For splitting, the current idea has been about
> using the backlog as a way of telling the
>

Re: Source or ParDo for reading data in PostgreSQL in Python

2018-10-02 Thread Jonathan Perron
I found that I forgot to perform a beam.Create() first... So no need to 
answer me, this solves my problem.


On 2018/10/02 08:40:46, Jonathan Perron  wrote:
> Hello,>
>
> I am looking for some way to access data stored in PostgreSQL and 
don't >

> know if I should go for a Sink or ParDo operations. It is stated that >
> ParDo could be used but I'm not sure this is what will solve my 
problem, >

> so here I am !I managed to write in the database with only ParDo >
> operations, so I guess it is also possible here.>
>
> Some details about my use case:>
>
> * The Python SDK is used;>
>
> * Reading in the database is the first operation of the pipeline 
before >

> making some calculation;>
>
> * It is performed with SQLAlchemy, but could also be done with 
psycopg2;>

>
> * I don't think parallelizing this operation is necessary as the query >
> are and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz).>
>
> Here are my DoFn classes:>
>
> /class ExtractFromPostgreSQLFn(beam.DoFn):>
>     """>
>     Extract PCollection from PostgreSQL>
>     """>
>
>     def start_bundle(self):>
>     self._session = Session()>
>
>     def process(self, element):>
>     raise NotImplementedError>
>
>     def finish_bundle(self):>
>     self._session.close()>
>
>
> class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):>
>     def start_bundle(self, arg1, arg2):>
>     super(ReadFromPostgresqlFn, self).start_bundle()>
>     self._arg1 = arg1>
>     self._arg2 = arg2>
>
>     def process(self, element):>
>     entities = (>
>     self._session.query(Entity)>
>     .filter(Entity.arg1 == self._arg1)>
>     .filter(Entity.arg2 == self._arg2)>
>     .all()>
>     )>
>     yield (self._arg1, arg2)/>
>
> As I said, I used it just after the initialization of the pipeline:>
>
> /p = beam.Pipeline(options=pipeline_options)>
> psql_entities = p | "Extract Entities from PSQL backup" >> >
> beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())/>
>
> Unfortunately, I end up with an /AttributeError: 'PBegin' object has 
no >

> attribute 'windowing'/ error.>
>
> Where did I make a mistake ? I take every input you could provide me 
on >

> this topic.>
>
> Thanks for your time,>
>
> Jonathan>
>
>
>

--



OptimData
71 rue Desnouettes, 75015 Paris, France

http://inuse.eu 






Re: Purpose of GcpApiSurfaceTest in google-cloud-platform SDK

2018-10-02 Thread Kenneth Knowles
Worth noting that these API surface tests are not in a great state; they
test everything on the class path rather than just true dependencies. I
don't know that they still ensure the desired property; they certainly
reject things that they need not. From your description, it sounds like in
your case they have worked as intended.

Kenn

On Tue, Oct 2, 2018 at 1:37 PM Andrew Pilloud  wrote:

> Hi Ken,
>
> My understanding is that this test is intended to prevent other packages
> from appearing on the public API surface of the Beam package. For example,
> guava can't appear on the Beam public API. This is to enable users to
> depend on different versions of these packages then what Beam depends on.
>
> See
> https://issues.apache.org/jira/browse/BEAM-878https://issues.apache.org/jira/browse/BEAM-878
>
> We might be able to provide more guidance if you open a Beam PR with your
> changes so we can see the diff and test failures.
>
> Andrew
>
> On Tue, Oct 2, 2018 at 1:02 PM Kenneth Jung  wrote:
>
>> Hi folks,
>>
>> I'm working on adding support for a new API to the google-cloud-platform
>> SDK, and some of my changes have caused the API surface test
>> 
>> to start failing. However, it's not totally clear to me what the purpose of
>> this test is -- it doesn't fail when new build-time dependencies are added,
>> but only when new types appear on the public API surface of the package.
>> What is the purpose of this test? Is it to prevent accidental additions of
>> new dependencies, or to make sure that the shadow configuration stays in
>> sync with the content of the package, or is there something else I'm not
>> thinking of? This will affect how I go about addressing the failures.
>>
>> Thanks
>> Ken
>>
>


Re: JIRA permission request

2018-10-02 Thread Kengo Seki
Thanks Kenneth, filed and submitted :)

Kengo Seki 

On Wed, Oct 3, 2018 at 11:21 AM Kenneth Knowles  wrote:
>
> Done. Thanks in advance for your contribution!
>
> On Tue, Oct 2, 2018 at 7:05 PM Kengo Seki  wrote:
>>
>> Hi,
>>
>> I found a minor issue in JdbcIO Javadoc and would like to submit a PR for it.
>> Would you add me to the contributor list? My JIRA id is "sekikn".
>>
>> Thanks!
>>
>> Kengo Seki 


Re: [Proposal] Add exception handling option to MapElements

2018-10-02 Thread Kenneth Knowles
Great essay. Thanks for the link!

I think this idea is very cool. It takes idiomatic Java exception-throwing
in myFunctionThatThrows and turns it into the FP fundamentals, where
success/failure(s) form a disjoint union and you partition on tag. I'm
interested in the details. Maybe also worth doing for flatMap? (others...?)

Kenn

On Tue, Oct 2, 2018 at 4:49 PM Jeff Klukas  wrote:

> I've seen a few Beam users mention the need to handle errors in their
> transforms by using a try/catch and routing to different outputs based on
> whether an exception was thrown. This was particularly nicely written up in
> a post by Vallery Lancey:
>
>
> https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a
>
> I'd love to see this pattern better supported directly in the Beam API,
> because it currently requires the user to implement a full DoFn even for
> the simplest cases.
>
> I propose we support for a MapElements-like transform that allows the user
> to specify a set of exceptions to catch and route to a failure output.
> Something like:
>
> MapElements
> .via(myFunctionThatThrows)
> .withSuccessTag(successTag)
> .withFailureTag(failureTag, JsonParsingException.class)
>
> which would output a PCollectionTuple with both the successful outcomes of
> the map operation and also a collection of the inputs that threw
> JsonParsingException.
>
> To make this more concrete, I put together a proof of concept PR:
> https://github.com/apache/beam/pull/6518  I'd appreciate feedback about
> whether this seems like a worthwhile addition and a feasible approach.
>


Re: JIRA permission request

2018-10-02 Thread Kenneth Knowles
Done. Thanks in advance for your contribution!

On Tue, Oct 2, 2018 at 7:05 PM Kengo Seki  wrote:

> Hi,
>
> I found a minor issue in JdbcIO Javadoc and would like to submit a PR for
> it.
> Would you add me to the contributor list? My JIRA id is "sekikn".
>
> Thanks!
>
> Kengo Seki 
>


JIRA permission request

2018-10-02 Thread Kengo Seki
Hi,

I found a minor issue in JdbcIO Javadoc and would like to submit a PR for
it.
Would you add me to the contributor list? My JIRA id is "sekikn".

Thanks!

Kengo Seki 


[Proposal] Add exception handling option to MapElements

2018-10-02 Thread Jeff Klukas
I've seen a few Beam users mention the need to handle errors in their
transforms by using a try/catch and routing to different outputs based on
whether an exception was thrown. This was particularly nicely written up in
a post by Vallery Lancey:

https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a

I'd love to see this pattern better supported directly in the Beam API,
because it currently requires the user to implement a full DoFn even for
the simplest cases.

I propose we support for a MapElements-like transform that allows the user
to specify a set of exceptions to catch and route to a failure output.
Something like:

MapElements
.via(myFunctionThatThrows)
.withSuccessTag(successTag)
.withFailureTag(failureTag, JsonParsingException.class)

which would output a PCollectionTuple with both the successful outcomes of
the map operation and also a collection of the inputs that threw
JsonParsingException.

To make this more concrete, I put together a proof of concept PR:
https://github.com/apache/beam/pull/6518  I'd appreciate feedback about
whether this seems like a worthwhile addition and a feasible approach.


Re: SplittableDoFn

2018-10-02 Thread Eugene Kirpichov
Very cool, thanks Alex!

On Tue, Oct 2, 2018 at 2:19 PM Alex Van Boxel  wrote:

> Don't want to crash the tech discussion here, but... I just gave a session
> at the Beam Summit about Splittable DoFn's as a users perspective (from
> things I could gather from the documentation and experimentation). Her is
> the slides deck, maybe it could be useful:
> https://docs.google.com/presentation/d/1dSc6oKh5pZItQPB_QiUyEoLT2TebMnj-pmdGipkVFPk/edit?usp=sharing
>  (quite
> proud of the animations though ;-)
>
>  _/
>
> _/ Alex Van Boxel
>
>
> On Thu, Sep 27, 2018 at 12:04 AM Lukasz Cwik  wrote:
>
>> Reuven, just inside the restriction tracker itself which is scoped per
>> executing SplittableDoFn. A user could incorrectly write the
>> synchronization since they are currently responsible for writing it though.
>>
>> On Wed, Sep 26, 2018 at 2:51 PM Reuven Lax  wrote:
>>
>>> is synchronization over an entire work item, or just inside restriction
>>> tracker? my concern is that some runners (especially streaming runners)
>>> might have hundreds or thousands of parallel work items being processed for
>>> the same SDF (for different keys), and I'm afraid of creating
>>> lock-contention bottlenecks.
>>>
>>> On Fri, Sep 21, 2018 at 3:42 PM Lukasz Cwik  wrote:
>>>
 The synchronization is related to Java thread safety since there is
 likely to be concurrent access needed to a restriction tracker to properly
 handle accessing the backlog and splitting concurrently from when the users
 DoFn is executing and updating the restriction tracker. This is similar to
 the Java thread safety needed in BoundedSource and UnboundedSource for
 fraction consumed, backlog bytes, and splitting.

 On Fri, Sep 21, 2018 at 2:38 PM Reuven Lax  wrote:

> Can you give details on what the synchronization is per? Is it per
> key, or global to each worker?
>
> On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik  wrote:
>
>> As I was looking at the SplittableDoFn API while working towards
>> making a proposal for how the backlog/splitting API could look, I found
>> some sharp edges that could be improved.
>>
>> I noticed that:
>> 1) We require users to write thread safe code, this is something that
>> we haven't asked of users when writing a DoFn.
>> 2) We "internal" methods within the RestrictionTracker that are not
>> meant to be used by the runner.
>>
>> I can fix these issues by giving the user a forwarding restriction
>> tracker[1] that provides an appropriate level of synchronization as 
>> needed
>> and also provides the necessary observation hooks to see when a claim
>> failed or succeeded.
>>
>> This requires a change to our experimental API since we need to pass
>> a RestrictionTracker to the @ProcessElement method instead of a sub-type 
>> of
>> RestrictionTracker.
>> @ProcessElement
>> processElement(ProcessContext context, OffsetRangeTracker tracker) {
>> ... }
>> becomes:
>> @ProcessElement
>> processElement(ProcessContext context,
>> RestrictionTracker tracker) { ... }
>>
>> This provides an additional benefit that it prevents users from
>> working around the RestrictionTracker APIs and potentially making
>> underlying changes to the tracker outside of the tryClaim call.
>>
>> Full implementation is available within this PR[2] and was wondering
>> what people thought.
>>
>> 1:
>> https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72
>> 2: https://github.com/apache/beam/pull/6467
>>
>>
>> On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik 
>> wrote:
>>
>>> The changes to the API have not been proposed yet. So far it has all
>>> been about what is the representation and why.
>>>
>>> For splitting, the current idea has been about using the backlog as
>>> a way of telling the SplittableDoFn where to split, so it would be in 
>>> terms
>>> of whatever the SDK decided to report.
>>> The runner always chooses a number for backlog that is relative to
>>> the SDKs reported backlog. It would be upto the SDK to round/clamp the
>>> number given by the Runner to represent something meaningful for itself.
>>> For example if the backlog that the SDK was reporting was bytes
>>> remaining in a file such as 500, then the Runner could provide some 
>>> value
>>> like 212.2 which the SDK would then round to 212.
>>> If the backlog that the SDK was reporting 57 pubsub messages, then
>>> the Runner could provide a value like 300 which would mean to read 57
>>> values and then another 243 as part of the current restriction.
>>>
>>> I believe that BoundedSource/UnboundedSource will have wrappers
>>> added that provide a basic SplittableDoFn implementation so existing IOs
>>> should be migrated over without API changes.
>>>
>>> On

Jenkins build is back to normal : beam_SeedJob #2734

2018-10-02 Thread Apache Jenkins Server
See 



Build failed in Jenkins: beam_SeedJob #2733

2018-10-02 Thread Apache Jenkins Server
See 

--
GitHub pull request #6549 of commit f844fb681ac7154e371f9fe92b25e36be331c937, 
no merge conflicts.
Setting status of f844fb681ac7154e371f9fe92b25e36be331c937 to PENDING with url 
https://builds.apache.org/job/beam_SeedJob/2733/ and message: 'Build started 
for merge commit.'
Using context: Jenkins: Seed Job
[EnvInject] - Loading node environment variables.
Building remotely on beam12 (beam) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/beam.git # timeout=10
Fetching upstream changes from https://github.com/apache/beam.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/beam.git 
 > +refs/heads/*:refs/remotes/origin/* 
 > +refs/pull/6549/*:refs/remotes/origin/pr/6549/*
 > git rev-parse refs/remotes/origin/pr/6549/merge^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/pr/6549/merge^{commit} # timeout=10
Checking out Revision 0272214ce8f90505bb4ce21437aa945635363603 
(refs/remotes/origin/pr/6549/merge)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 0272214ce8f90505bb4ce21437aa945635363603
Commit message: "Merge f844fb681ac7154e371f9fe92b25e36be331c937 into 
8a523adbb7f8f796f2cae05e6a2897938ce1288b"
First time build. Skipping changelog.
Cleaning workspace
 > git rev-parse --verify HEAD # timeout=10
Resetting working tree
 > git reset --hard # timeout=10
 > git clean -fdx # timeout=10
Processing DSL script job_00_seed.groovy
Processing DSL script job_Dependency_Check.groovy
Processing DSL script job_Inventory.groovy
Processing DSL script job_PerformanceTests_Dataflow.groovy
Processing DSL script job_PerformanceTests_FileBasedIO_IT.groovy
Processing DSL script job_PerformanceTests_FileBasedIO_IT_HDFS.groovy
Processing DSL script job_PerformanceTests_HadoopInputFormat.groovy
Processing DSL script job_PerformanceTests_JDBC.groovy
Processing DSL script job_PerformanceTests_MongoDBIO_IT.groovy
Processing DSL script job_PerformanceTests_Python.groovy
Processing DSL script job_PerformanceTests_Spark.groovy
Processing DSL script job_PostCommit_Go_GradleBuild.groovy
Processing DSL script job_PostCommit_Java_GradleBuild.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Dataflow.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Direct.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Flink.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Spark.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Apex.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Dataflow.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Flink.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Gearpump.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Samza.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Spark.groovy
Processing DSL script job_PostCommit_Python_ValidatesContainer_Dataflow.groovy
Processing DSL script job_PostCommit_Python_ValidatesRunner_Dataflow.groovy
Processing DSL script job_PostCommit_Python_ValidatesRunner_Flink.groovy
Processing DSL script job_PostCommit_Python_Verify.groovy
Processing DSL script job_PostCommit_Website_Publish.groovy
ERROR: (PostcommitJobBuilder.groovy, line 66) No such property: delegate for 
class: PostcommitJobBuilder


Re: SplittableDoFn

2018-10-02 Thread Alex Van Boxel
Don't want to crash the tech discussion here, but... I just gave a session
at the Beam Summit about Splittable DoFn's as a users perspective (from
things I could gather from the documentation and experimentation). Her is
the slides deck, maybe it could be useful:
https://docs.google.com/presentation/d/1dSc6oKh5pZItQPB_QiUyEoLT2TebMnj-pmdGipkVFPk/edit?usp=sharing
(quite
proud of the animations though ;-)

 _/
_/ Alex Van Boxel


On Thu, Sep 27, 2018 at 12:04 AM Lukasz Cwik  wrote:

> Reuven, just inside the restriction tracker itself which is scoped per
> executing SplittableDoFn. A user could incorrectly write the
> synchronization since they are currently responsible for writing it though.
>
> On Wed, Sep 26, 2018 at 2:51 PM Reuven Lax  wrote:
>
>> is synchronization over an entire work item, or just inside restriction
>> tracker? my concern is that some runners (especially streaming runners)
>> might have hundreds or thousands of parallel work items being processed for
>> the same SDF (for different keys), and I'm afraid of creating
>> lock-contention bottlenecks.
>>
>> On Fri, Sep 21, 2018 at 3:42 PM Lukasz Cwik  wrote:
>>
>>> The synchronization is related to Java thread safety since there is
>>> likely to be concurrent access needed to a restriction tracker to properly
>>> handle accessing the backlog and splitting concurrently from when the users
>>> DoFn is executing and updating the restriction tracker. This is similar to
>>> the Java thread safety needed in BoundedSource and UnboundedSource for
>>> fraction consumed, backlog bytes, and splitting.
>>>
>>> On Fri, Sep 21, 2018 at 2:38 PM Reuven Lax  wrote:
>>>
 Can you give details on what the synchronization is per? Is it per key,
 or global to each worker?

 On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik  wrote:

> As I was looking at the SplittableDoFn API while working towards
> making a proposal for how the backlog/splitting API could look, I found
> some sharp edges that could be improved.
>
> I noticed that:
> 1) We require users to write thread safe code, this is something that
> we haven't asked of users when writing a DoFn.
> 2) We "internal" methods within the RestrictionTracker that are not
> meant to be used by the runner.
>
> I can fix these issues by giving the user a forwarding restriction
> tracker[1] that provides an appropriate level of synchronization as needed
> and also provides the necessary observation hooks to see when a claim
> failed or succeeded.
>
> This requires a change to our experimental API since we need to pass
> a RestrictionTracker to the @ProcessElement method instead of a sub-type 
> of
> RestrictionTracker.
> @ProcessElement
> processElement(ProcessContext context, OffsetRangeTracker tracker) {
> ... }
> becomes:
> @ProcessElement
> processElement(ProcessContext context, RestrictionTracker Long> tracker) { ... }
>
> This provides an additional benefit that it prevents users from
> working around the RestrictionTracker APIs and potentially making
> underlying changes to the tracker outside of the tryClaim call.
>
> Full implementation is available within this PR[2] and was wondering
> what people thought.
>
> 1:
> https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72
> 2: https://github.com/apache/beam/pull/6467
>
>
> On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik  wrote:
>
>> The changes to the API have not been proposed yet. So far it has all
>> been about what is the representation and why.
>>
>> For splitting, the current idea has been about using the backlog as a
>> way of telling the SplittableDoFn where to split, so it would be in terms
>> of whatever the SDK decided to report.
>> The runner always chooses a number for backlog that is relative to
>> the SDKs reported backlog. It would be upto the SDK to round/clamp the
>> number given by the Runner to represent something meaningful for itself.
>> For example if the backlog that the SDK was reporting was bytes
>> remaining in a file such as 500, then the Runner could provide some value
>> like 212.2 which the SDK would then round to 212.
>> If the backlog that the SDK was reporting 57 pubsub messages, then
>> the Runner could provide a value like 300 which would mean to read 57
>> values and then another 243 as part of the current restriction.
>>
>> I believe that BoundedSource/UnboundedSource will have wrappers added
>> that provide a basic SplittableDoFn implementation so existing IOs should
>> be migrated over without API changes.
>>
>> On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía 
>> wrote:
>>
>>> Thanks a lot Luke for bringing this back to the mailing list and
>>> Ryan for taking
>>> the notes.
>>>
>>> I would like to know if there

Re: Purpose of GcpApiSurfaceTest in google-cloud-platform SDK

2018-10-02 Thread Andrew Pilloud
Hi Ken,

My understanding is that this test is intended to prevent other packages
from appearing on the public API surface of the Beam package. For example,
guava can't appear on the Beam public API. This is to enable users to
depend on different versions of these packages then what Beam depends on.

See
https://issues.apache.org/jira/browse/BEAM-878https://issues.apache.org/jira/browse/BEAM-878

We might be able to provide more guidance if you open a Beam PR with your
changes so we can see the diff and test failures.

Andrew

On Tue, Oct 2, 2018 at 1:02 PM Kenneth Jung  wrote:

> Hi folks,
>
> I'm working on adding support for a new API to the google-cloud-platform
> SDK, and some of my changes have caused the API surface test
> 
> to start failing. However, it's not totally clear to me what the purpose of
> this test is -- it doesn't fail when new build-time dependencies are added,
> but only when new types appear on the public API surface of the package.
> What is the purpose of this test? Is it to prevent accidental additions of
> new dependencies, or to make sure that the shadow configuration stays in
> sync with the content of the package, or is there something else I'm not
> thinking of? This will affect how I go about addressing the failures.
>
> Thanks
> Ken
>


Purpose of GcpApiSurfaceTest in google-cloud-platform SDK

2018-10-02 Thread Kenneth Jung
Hi folks,

I'm working on adding support for a new API to the google-cloud-platform
SDK, and some of my changes have caused the API surface test

to start failing. However, it's not totally clear to me what the purpose of
this test is -- it doesn't fail when new build-time dependencies are added,
but only when new types appear on the public API surface of the package.
What is the purpose of this test? Is it to prevent accidental additions of
new dependencies, or to make sure that the shadow configuration stays in
sync with the content of the package, or is there something else I'm not
thinking of? This will affect how I go about addressing the failures.

Thanks
Ken


Re: Python 3: final step

2018-10-02 Thread Pablo Estrada
Very cool : ) I'm also available to review / merge if you need help from my
side.
Best
-P.

On Tue, Oct 2, 2018 at 7:45 AM Rakesh Kumar  wrote:

> Hi Rob,
>
> I am, Rakesh Kumar, using Beam SDK for one of my projects at Lyft. I have
> been working closely with Thomas Weise. I have already met a couple of
> Python SDK developers in person.
> I am interested to help migrate to Python 3. You can assign me PRs for
> review. I am also more than happy to take a simple ticket to begin
> development work on Beam.
>
> Thank you,
> Rakesh
>
> On Wed, Sep 5, 2018 at 9:12 AM Robbe Sneyders 
> wrote:
>
>> Hi everyone,
>>
>> With the merging of [1], we now have Python 3 tests running on Jenkins,
>> which allows us to move forward with the last step of the Python 3 porting.
>>
>> You can follow the progress on the Jira Kanban Board [2]. If you're
>> interested in helping by porting a module, you can assign one of the issues
>> to yourself and start coding. You can find the different steps outlined in
>> the design document [3].
>>
>> We could also use some extra reviewers. If you're interested, let us
>> know, and we'll tag you in our PRs.
>>
>> [1] https://github.com/apache/beam/pull/6266
>> [2] https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=245
>> [3] https://s.apache.org/beam-python-3
>>
>> kind regards,
>> Robbe
>> --
>>
>> [image: https://ml6.eu] 
>>
>> * Robbe Sneyders*
>>
>> ML6 Gent
>> 
>>
>> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>>
> --
> Rakesh Kumar
> Software Engineer
> 510-761-1364 <(510)%20761-1364> |
>
> 
>


Re: Python 3: final step

2018-10-02 Thread Rakesh Kumar
Hi Rob,

I am, Rakesh Kumar, using Beam SDK for one of my projects at Lyft. I have
been working closely with Thomas Weise. I have already met a couple of
Python SDK developers in person.
I am interested to help migrate to Python 3. You can assign me PRs for
review. I am also more than happy to take a simple ticket to begin
development work on Beam.

Thank you,
Rakesh

On Wed, Sep 5, 2018 at 9:12 AM Robbe Sneyders  wrote:

> Hi everyone,
>
> With the merging of [1], we now have Python 3 tests running on Jenkins,
> which allows us to move forward with the last step of the Python 3 porting.
>
> You can follow the progress on the Jira Kanban Board [2]. If you're
> interested in helping by porting a module, you can assign one of the issues
> to yourself and start coding. You can find the different steps outlined in
> the design document [3].
>
> We could also use some extra reviewers. If you're interested, let us know,
> and we'll tag you in our PRs.
>
> [1] https://github.com/apache/beam/pull/6266
> [2] https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=245
> [3] https://s.apache.org/beam-python-3
>
> kind regards,
> Robbe
> --
>
> [image: https://ml6.eu] 
>
> * Robbe Sneyders*
>
> ML6 Gent
> 
>
> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>
-- 
Rakesh Kumar
Software Engineer
510-761-1364 |




Re: Python typing library is not provisional in Python 3.7

2018-10-02 Thread Manu Zhang
For record, I logged https://issues.apache.org/jira/browse/BEAM-5607 and sent a 
PR. This may also help anyone who run the tests locally in Python 3.7.
在 2018年9月29日 +0800 AM3:06,Valentyn Tymofieiev ,写道:
> Hi Manu,
>
> I second what Ahmet said - thanks for the pointers. Python 3.7 support can 
> come later down the road.
>
> Thanks,
> Valentyn
>
> > On Fri, Sep 28, 2018 at 11:17 AM Ahmet Altay  wrote:
> > > Hi Manu,
> > >
> > > Currently, we use Python 3.5.2 on Jenkins for testing. Python tests print 
> > > out the python version in the console logs and I found this information 
> > > from one of the logs [1]. Initial proposal for the Python 3 support was 
> > > to support a specific version of python 3 during the porting process and 
> > > later on work to add support for additional versions [2]. (Also note 
> > > that, Python 3 was released about 3 months ago and after the porting 
> > > effort started.)
> > >
> > > Hope this helps.
> > >
> > > Ahmet
> > >
> > > [1] 
> > > https://builds.apache.org/view/A-D/view/Beam/job/beam_PostCommit_Python_Verify/6114/consoleFull
> > > [2] 
> > > https://lists.apache.org/thread.html/5371469de567357b1431606f766217ef73a9098dc45046f51a6ecceb@%3Cdev.beam.apache.org%3E
> > >
> > > > On Thu, Sep 27, 2018 at 10:09 PM, Manu Zhang  
> > > > wrote:
> > > > > Hi Valentyn,
> > > > >
> > > > > I'm aware there is Python 3 environment and have worked on the 
> > > > > options module. Yes, I'd love to contribute more.
> > > > > The issue I raise here is specifically about Python 3.7, where the 
> > > > > dependency on typing library would fail all the tests.
> > > > > Do you know which version of Python 3 is setup for our tests ?
> > > > >
> > > > > Manu
> > > > >
> > > > > > On Fri, Sep 28, 2018 at 8:02 AM Valentyn Tymofieiev 
> > > > > >  wrote:
> > > > > > > Hi Manu,
> > > > > > >
> > > > > > > We have added Python 3 environment to our tests see [1], and we 
> > > > > > > are actively making changes to Beam code to make it Python 
> > > > > > > 3-compatible. We are enabling tests module by module, although we 
> > > > > > > have to disable some of the tests initially, when failures are 
> > > > > > > likely introduced in other modules.
> > > > > > >
> > > > > > > I think @RobbeSneyders is currently working on typehints package 
> > > > > > > specifically, as per our Kanban board [2].
> > > > > > >
> > > > > > > If you (or anyone else) is interested in helping with Python 3 
> > > > > > > support, and has cycles to actively work on it now, please reach 
> > > > > > > out - I would be happy to coordinate the effort, and help with 
> > > > > > > code reviews.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Valentyn
> > > > > > >
> > > > > > > [1]  
> > > > > > > https://github.com/apache/beam/blob/5d298db4c20bbb8876a5b75142341332c1e3fb8d/sdks/python/tox.ini#L56
> > > > > > > [2]  
> > > > > > > https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=245&view=detail
> > > > > > >
> > > > > > > > On Thu, Sep 27, 2018 at 3:52 PM Manu Zhang 
> > > > > > > >  wrote:
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > I failed to run Python tests in 3.7 with the following error.
> > > > > > > > >
> > > > > > > > >   File 
> > > > > > > > > "/Users/doria/git/incubator-beam/sdks/python/apache_beam/typehints/native_type_compatibility.py",
> > > > > > > > >  line 23, in 
> > > > > > > > >     import typing
> > > > > > > > >   File 
> > > > > > > > > "/Users/doria/git/incubator-beam/sdks/python/.eggs/typing-3.6.6-py3.7.egg/typing.py",
> > > > > > > > >  line 1356, in 
> > > > > > > > >     class Callable(extra=collections_abc.Callable, 
> > > > > > > > > metaclass=CallableMeta):
> > > > > > > > >   File 
> > > > > > > > > "/Users/doria/git/incubator-beam/sdks/python/.eggs/typing-3.6.6-py3.7.egg/typing.py",
> > > > > > > > >  line 1004, in __new__
> > > > > > > > >     self._abc_registry = extra._abc_registry
> > > > > > > > > AttributeError: type object 'Callable' has no attribute 
> > > > > > > > > '_abc_registry'
> > > > > > > > >
> > > > > > > > > This is because the required typing library is not 
> > > > > > > > > provisional in Python 3.7.
> > > > > > > > >
> > > > > > > > > Any thoughts on this? Shall we add Python 3.7 environment to 
> > > > > > > > > our tests ?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Manu Zhang
> > >


Re: Source or ParDo for reading data in PostgreSQL in Python

2018-10-02 Thread Jonathan Perron

Hello again,

Thanks to 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/estimate_pi.py, 
I saw that my first step must be a beam.Create() with an iterable. Doing 
so solved my problem.


Sorry for my mistake.

On 2018/10/02 08:40:46, Jonathan Perron  wrote:
> Hello,>
>
> I am looking for some way to access data stored in PostgreSQL and 
don't >

> know if I should go for a Sink or ParDo operations. It is stated that >
> ParDo could be used but I'm not sure this is what will solve my 
problem, >

> so here I am !I managed to write in the database with only ParDo >
> operations, so I guess it is also possible here.>
>
> Some details about my use case:>
>
> * The Python SDK is used;>
>
> * Reading in the database is the first operation of the pipeline 
before >

> making some calculation;>
>
> * It is performed with SQLAlchemy, but could also be done with 
psycopg2;>

>
> * I don't think parallelizing this operation is necessary as the query >
> are and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz).>
>
> Here are my DoFn classes:>
>
> /class ExtractFromPostgreSQLFn(beam.DoFn):>
>     """>
>     Extract PCollection from PostgreSQL>
>     """>
>
>     def start_bundle(self):>
>     self._session = Session()>
>
>     def process(self, element):>
>     raise NotImplementedError>
>
>     def finish_bundle(self):>
>     self._session.close()>
>
>
> class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):>
>     def start_bundle(self, arg1, arg2):>
>     super(ReadFromPostgresqlFn, self).start_bundle()>
>     self._arg1 = arg1>
>     self._arg2 = arg2>
>
>     def process(self, element):>
>     entities = (>
>     self._session.query(Entity)>
>     .filter(Entity.arg1 == self._arg1)>
>     .filter(Entity.arg2 == self._arg2)>
>     .all()>
>     )>
>     yield (self._arg1, arg2)/>
>
> As I said, I used it just after the initialization of the pipeline:>
>
> /p = beam.Pipeline(options=pipeline_options)>
> psql_entities = p | "Extract Entities from PSQL backup" >> >
> beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())/>
>
> Unfortunately, I end up with an /AttributeError: 'PBegin' object has 
no >

> attribute 'windowing'/ error.>
>
> Where did I make a mistake ? I take every input you could provide me 
on >

> this topic.>
>
> Thanks for your time,>
>
> Jonathan>
>
>
>


Re: Source or ParDo for reading data in PostgreSQL in Python

2018-10-02 Thread Pascal Gula
Hi Jonathan,
I had a similar requirement as yours, but for mongoDB and I tentatively
wrote an IO Connector for it that you can find here:
https://github.com/PEAT-AI/beam-extended
It is working with the DirectRunner in Read mode (I need to do some test
then on DataFlow).
But I faced some issue with the connector serializability for Write mode.
Long story short, pymongo is not serializable, and this is something you
have to validate for sqlalchemy.
Generally speaking, take a look at this part of the doc if you want to know
more about IO connector development:
https://beam.apache.org/documentation/sdks/python-custom-io/
I am not an expert but I'll be glad to help you since I would also need to
support PostgreSQL in the near future!
Cheers,
Pascal


[image: --]

Pascal Gula
[image: https://]about.me/metanov



On Tue, 2 Oct 2018 at 10:41, Jonathan Perron 
wrote:

> Hello,
>
> I am looking for some way to access data stored in PostgreSQL and don't
> know if I should go for a Sink or ParDo operations. It is stated that ParDo
> could be used but I'm not sure this is what will solve my problem, so here
> I am !I managed to write in the database with only ParDo operations, so I
> guess it is also possible here.
>
> Some details about my use case:
>
> * The Python SDK is used;
>
> * Reading in the database is the first operation of the pipeline before
> making some calculation;
>
> * It is performed with SQLAlchemy, but could also be done with psycopg2;
>
> * I don't think parallelizing this operation is necessary as the query are
> and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz).
>
> Here are my DoFn classes:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *class ExtractFromPostgreSQLFn(beam.DoFn): """ Extract PCollection
> from PostgreSQL """ def start_bundle(self): self._session =
> Session() def process(self, element): raise NotImplementedError
> def finish_bundle(self): self._session.close() class
> ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn): def
> start_bundle(self, arg1, arg2): super(ReadFromPostgresqlFn,
> self).start_bundle() self._arg1 = arg1 self._arg2 = arg2
> def process(self, element): entities = (
> self._session.query(Entity) .filter(Entity.arg1 == self._arg1)
> .filter(Entity.arg2 == self._arg2) .all() )
> yield (self._arg1, arg2)*
>
> As I said, I used it just after the initialization of the pipeline:
>
>
> *p = beam.Pipeline(options=pipeline_options) psql_entities = p | "Extract
> Entities from PSQL backup" >>
> beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())*
>
> Unfortunately, I end up with an *AttributeError: 'PBegin' object has no
> attribute 'windowing'* error.
>
> Where did I make a mistake ? I take every input you could provide me on
> this topic.
>
> Thanks for your time,
>
> Jonathan
>
>
>

-- 

Pascal Gula
Senior Data Engineer / Scientist
+49 (0)176 34232684www.plantix.net 
 PEAT GmbH
Kastanienallee 4
10435 Berlin // Germany
 Download
the App! 


Source or ParDo for reading data in PostgreSQL in Python

2018-10-02 Thread Jonathan Perron

Hello,

I am looking for some way to access data stored in PostgreSQL and don't 
know if I should go for a Sink or ParDo operations. It is stated that 
ParDo could be used but I'm not sure this is what will solve my problem, 
so here I am !I managed to write in the database with only ParDo 
operations, so I guess it is also possible here.


Some details about my use case:

* The Python SDK is used;

* Reading in the database is the first operation of the pipeline before 
making some calculation;


* It is performed with SQLAlchemy, but could also be done with psycopg2;

* I don't think parallelizing this operation is necessary as the query 
are and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz).


Here are my DoFn classes:

/class ExtractFromPostgreSQLFn(beam.DoFn):
    """
    Extract PCollection from PostgreSQL
    """

    def start_bundle(self):
    self._session = Session()

    def process(self, element):
    raise NotImplementedError

    def finish_bundle(self):
    self._session.close()


class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):
    def start_bundle(self, arg1, arg2):
    super(ReadFromPostgresqlFn, self).start_bundle()
    self._arg1 = arg1
    self._arg2 = arg2

    def process(self, element):
    entities = (
    self._session.query(Entity)
    .filter(Entity.arg1 == self._arg1)
    .filter(Entity.arg2 == self._arg2)
    .all()
    )
    yield (self._arg1, arg2)/

As I said, I used it just after the initialization of the pipeline:

/p = beam.Pipeline(options=pipeline_options)
psql_entities = p | "Extract Entities from PSQL backup" >> 
beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())/


Unfortunately, I end up with an /AttributeError: 'PBegin' object has no 
attribute 'windowing'/ error.


Where did I make a mistake ? I take every input you could provide me on 
this topic.


Thanks for your time,

Jonathan