Re: SplittableDoFn
I came up with a proposal[1] for a progress model solely based off of the backlog and that splits should be based upon the remaining backlog we want the SDK to split at. I also give recommendations to runner authors as to how an autoscaling system could work based upon the measured backlog. A lot of discussions around progress reporting and splitting in the past has always been around finding an optimal solution, after reading a lot of information about work stealing, I don't believe there is a general solution and it really is upto SplittableDoFns to be well behaved. I did not do much work in classifying what a well behaved SplittableDoFn is though. Much of this work builds off ideas that Eugene had documented in the past[2]. I could use the communities wide knowledge of different I/Os to see if computing the backlog is practical in the way that I'm suggesting and to gather people's feedback. If there is a lot of interest, I would like to hold a community video conference between Sept 10th and 14th about this topic. Please reply with your availability by Sept 6th if your interested. 1: https://s.apache.org/beam-bundles-backlog-splitting 2: https://s.apache.org/beam-breaking-fusion On Mon, Aug 13, 2018 at 10:21 AM Jean-Baptiste Onofré wrote: > Awesome ! > > Thanks Luke ! > > I plan to work with you and others on this one. > > Regards > JB > Le 13 août 2018, à 19:14, Lukasz Cwik a écrit: >> >> I wanted to reach out that I will be continuing from where Eugene left >> off with SplittableDoFn. I know that many of you have done a bunch of work >> with IOs and/or runner integration for SplittableDoFn and would appreciate >> your help in advancing this awesome idea. If you have questions or things >> you want to get reviewed related to SplittableDoFn, feel free to send them >> my way or include me on anything SplittableDoFn related. >> >> I was part of several discussions with Eugene and I think the biggest >> outstanding design portion is to figure out how dynamic work rebalancing >> would play out with the portability APIs. This includes reporting of >> progress from within a bundle. I know that Eugene had shared some documents >> in this regard but the position / split models didn't work too cleanly in a >> unified sense for bounded and unbounded SplittableDoFns. It will likely >> take me awhile to gather my thoughts but could use your expertise as to how >> compatible these ideas are with respect to to IOs and runners >> Flink/Spark/Dataflow/Samza/Apex/... and obviously help during >> implementation. >> >
Re: jira search in chrome omnibox
Correction: this is the correct URL: https://issues.apache.org/jira/secure/QuickSearch.jspa?searchString=%s It uses smart querying. Ex: Searching for "beam open pubsub" will search for open bugs in project BEAM with the keyword "pubsub". On Tue, Aug 28, 2018 at 4:49 PM Valentyn Tymofieiev wrote: > Thanks for sharing. > > I have also found useful following custom search query for PRs: > https://github.com/apache/beam/pulls?q=is%3Apr%20%s > > Sample usage: type 'pr', space, type: 'author:tvalentyn'. > > You could also incorporate 'author:' into the query: > https://github.com/apache/beam/pulls?q=is%3Apr%20author%3A > > On Tue, Aug 28, 2018 at 4:26 PM Daniel Oliveira > wrote: > >> This seems pretty useful. Thanks Udi! >> >> On Mon, Aug 27, 2018 at 3:54 PM Udi Meiri wrote: >> >>> In case you want to quickly look up JIRA tickets, e.g., typing 'j', >>> space, 'BEAM-4696'. >>> Search URL: >>> https://issues.apache.org/jira/QuickSearch.jspa?searchString=%s >>> >>> smime.p7s Description: S/MIME Cryptographic Signature
Re: builds.apache.org refused connections since last night
Hey Thomas, I guess a comitter can push changes directly into https://gitbox.apache.org/repos/asf?p=beam-site.git. Maybe can have a try with a comitter's help. On Thu, Aug 30, 2018 at 10:34 AM Thomas Weise wrote: > While Jenkins is down, is there an alternative process to merge web site > changes? > > > On Wed, Aug 29, 2018 at 9:19 AM Boyuan Zhang wrote: > >> Thank you Andrew! >> >> On Wed, Aug 29, 2018 at 9:17 AM Andrew Pilloud >> wrote: >> >>> Down for me too. It sounds like the disk failed and it will be down for >>> a while: https://status.apache.org/ >>> >>> Andrew >>> >>> On Wed, Aug 29, 2018 at 9:13 AM Boyuan Zhang wrote: >>> Hey all, It seems like that builds.apache.org cannot be reached from last night. Does anyone have the sam e connection problem? Any idea what can we do? Boyuan Zhang >>>
Re: builds.apache.org refused connections since last night
While Jenkins is down, is there an alternative process to merge web site changes? On Wed, Aug 29, 2018 at 9:19 AM Boyuan Zhang wrote: > Thank you Andrew! > > On Wed, Aug 29, 2018 at 9:17 AM Andrew Pilloud > wrote: > >> Down for me too. It sounds like the disk failed and it will be down for a >> while: https://status.apache.org/ >> >> Andrew >> >> On Wed, Aug 29, 2018 at 9:13 AM Boyuan Zhang wrote: >> >>> Hey all, >>> >>> It seems like that builds.apache.org cannot be reached from last night. >>> Does anyone have the sam e connection problem? Any idea what can we do? >>> >>> Boyuan Zhang >>> >>
Re: Status of IntelliJ with Gradle
Small update, it helps to add the following to the IntelliJ properties: Help -> Edit Custom Properties idea.max.intellisense.filesize=5000 This gets rid of the errors due to large generated source files, e.g. RunnerApi.java. -Max On 22.08.18 23:26, Kai Jiang wrote: I encountered same error with Xinyu when I was launching unit tests in Intellij. For now, I am only using gradle to test unit tests. Thanks, Kai On 2018/08/22 21:11:06, Xinyu Liu wrote: We experienced the same issues too in intellij after switching to latest version. I did the trick Luke mentioned before to include the beam-model-fn-execution and beam-model-job-management jars in the dependent modules to get around compilation. But I cannot get the vendored protobuf working. Seems the RunnerApi is using the original protobuf package, and it causes confusion in intellij if I added the relocated jar. As a result, I have to run and debug only using gradle for now. Thanks, Xinyu On Wed, Aug 22, 2018 at 1:45 AM, Maximilian Michels wrote: Thanks Lukasz. I also found that I can never fix all import errors by manually adding jars to the IntelliJ library list. It is also not a good solution because it breaks on reloading the Gradle project. New contributors might find the errors in IntelliJ distracting. Even worse, they might assume the problem is on their side. If we can't fix them soon, I'd suggest documenting the IntelliJ limitations in the contributor guide. On 20.08.18 17:58, Lukasz Cwik wrote: Yes, I have the same issues with vendoring. These are the things that I have tried without success to get Intellij to import the vendored modules correctly: * attempted to modify the idea.module.scopes to only include the vendored artifacts (for some reason this is ignored and Intellij is relying on the output of its own internal module, nothing I add to the scopes seems to impact anything) * modify the generated iml beforehand to add the vendored jar file as the top dependency (jar never appears in the modules dependencies) On Mon, Aug 20, 2018 at 8:36 AM Maximilian Michels mailto:m...@apache.org>> wrote: Thank you Etienne for opening the issue. Anyone else having problems with the shaded Protobuf dependency? On 20.08.18 16:14, Etienne Chauchot wrote: > Hi Max, > > I experienced the same, I had first opened a general ticket > (https://issues.apache.org/jira/browse/BEAM-4418) about gradle > improvements and I just split it in several tickets. Here is the one > concerning the same issue: https://issues.apache.org/jira/browse/BEAM-5176 > > Etienne > > Le lundi 20 août 2018 à 15:51 +0200, Maximilian Michels a écrit : >> Hi Beamers, >> >> It's great to see the Beam build system overhauled. Thank you for all >> the hard work. >> >> That said, I've just started contributing to Beam again and I feel >> really stupid for not having a fully-functional IDE. I've closely >> followed the IntelliJ/Gradle instructions [1]. In the terminal >> everything works fine. >> >> First of all, I get warnings like the following and the build fails: >> >> >> .../beam/sdks/java/core/src/main/java/org/apache/beam/sdk/pa ckage-info.java:29: >> warning: [deprecation] NonNull in edu.umd.cs.findbugs.annotations has >> been deprecated >> @DefaultAnnotation(NonNull.class) >>^ >> error: warnings found and -Werror specified >> 1 error >> 89 warnings >> = >> >> Somehow the "-Xlint:-deprecation" compiler flag does not get through but >> "-Werror" does. I can get it to compile by removing the "-Werror" flag >> from BeamModulePlugin but that's obviously not the solution. >> >> Further, once the build succeeds I still have to add the relocated >> Protobuf library manually because the one in "vendor" does not get >> picked up. I've tried clearing caches / re-adding the project / >> upgrading IntelliJ / changing Gradle configs. >> >> >> Is this just me or do you also have similar problems? If so, I would >> like to compile a list of possible fixes to help other contributors. >> >> >> Thanks, >> Max >> >> >> Tested with >> - IntelliJ 2018.1.6 and 2018.2.1. >> - MacOS >> - java version "1.8.0_112" >> >> [1] https://beam.apache.org/contribute/intellij/ >> >> -- Max
Re: [QUESTION] retrial in sources by the runners
Runners are responsible for retry semantics, they should catch the failure and choose whether they want to retry or not. I think your reading of the code is correct. Some I/O layers do retry but that is more about attempting to continue processing within a bundle instead of failing and having the runner choose to retry the bundle or not. On Thu, Aug 30, 2018 at 5:57 AM Etienne Chauchot wrote: > Hi all, > > I have a question concerning retrial of sources. I've looked at the code > of direct runner and spark runner on bounded sources. As far as I can tell, > if there is a failure in reading a record from the reader of the source, > there will be no retrial from the runner, there will just be an exception > thrown. The only retrial that can take place with the source is the one > that could be done by the reader itself. > > Can you confirm that I did not miss something? > > Etienne >
Re: Beam Schemas: current status
Max, Nested Pojos are fully supported, as are nested array/collection and map types (e.g. if your Pojo contains List). One limitation right now is that only mutable Pojos are supported. For example, the following Pojo would _not_ work, because the fields aren't mutable. public class Pojo { public final String field; } This is an annoying restriction, because in practice Pojo types often have final fields. The reason for the restriction is that the most general way to create an instance of this Pojo (after decoding) is to instantiate the object and then set the fields one by one (I also assume that there's a default constructor). I can remove this restriction if there is an appropriate constructor or builder interface that lets us construct the object directly. Reuven On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels wrote: > That's a cool feature. Are there any limitations for the schema > inference apart from being a Pojo/Bean? Does it supported nested PoJos, > e.g. "wrapper.field"? > > -Max > > On 29.08.18 07:40, Reuven Lax wrote: > > I wanted to send a quick note to the community about the current status > > of schema-aware PCollections in Beam. As some might remember we had a > > good discussion last year about the design of these schemas, involving > > many folks from different parts of the community. I sent a summary > > earlier this year explaining how schemas has been integrated into the > > DoFn framework. Much has happened since then, and here are some of the > > highlights. > > > > > > First, I want to emphasize that all the schema-aware classes are > > currently marked @Experimental. Nothing is set in stone yet, so if you > > have questions about any decisions made, please start a discussion! > > > > > > SQL > > > > The first big milestone for schemas was porting all of BeamSQL to use > > the framework, which was done in pr/5956. This was a lot of work, > > exposed many bugs in the schema implementation, but now provides great > > evidence that schemas work! > > > > > > Schema inference > > > > Beam can automatically infer schemas from Java POJOs (objects with > > public fields) or JavaBean objects (objects with getter/setter methods). > > Often you can do this by simply annotating the class. For example: > > > > > > @DefaultSchema(JavaFieldSchema.class) > > > > publicclassUserEvent{ > > > > publicStringuserId; > > > > publicLatLonglocation; > > > > PublicStringcountryCode; > > > > publiclongtransactionCost; > > > > publicdoubletransactionDuration; > > > > publicListtraceMessages; > > > > }; > > > > > > @DefaultSchema(JavaFieldSchema.class) > > > > publicclassLatLong{ > > > > publicdoublelatitude; > > > > publicdoublelongitude; > > > > } > > > > > > Beam will automatically infer schemas for these classes! So if you have > > a PCollection, it will automatically get the following schema: > > > > > > UserEvent: > > > > userId: STRING > > > > location: ROW(LatLong) > > > > countryCode: STRING > > > > transactionCost: INT64 > > > > transactionDuration: DOUBLE > > > > traceMessages: ARRAY[STRING]] > > > > > > LatLong: > > > > latitude: DOUBLE > > > > longitude: DOUBLE > > > > > > Now it’s not always possible to annotate the class like this (you may > > not own the class definition), so you can also explicitly register this > > using Pipeline:getSchemaRegistry:registerPOJO, and the same for > JavaBeans. > > > > > > Coders > > > > Beam has a built-in coder for any schema-aware PCollection, largely > > removing the need for users to care about coders. We generate low-level > > bytecode (using ByteBuddy) to implement the coder for each schema, so > > these coders are quite performant. This provides a better default coder > > for Java POJO objects as well. In the past users were recommended to use > > AvroCoder for pojos, which many have found inefficient. Now there’s a > > more-efficient solution. > > > > > > Utility Transforms > > > > Schemas are already useful for implementers of extensions such as SQL, > > but the goal was to use them to make Beam itself easier to use. To this > > end, I’ve been implementing a library of transforms that allow for easy > > manipulation of schema PCollections. So far Filter and Select are > > merged, Group is about to go out for review (it needs some more javadoc > > and unit tests), and Join is being developed but doesn’t yet have a > > final interface. > > > > > > Filter > > > > Given a PCollection, I want to keep only those in an area of > > southern manhattan. Well this is easy! > > > > > > PCollectionmanhattanEvents =allEvents.apply(Filter > > > > .whereFieldName("latitude",lat ->lat <40.720& >40.699) > > > > .whereFieldName("longitude",long->long<-73.969&>-74.747)); > > > > > > Schemas along with lambdas allows us to write this transform > > declaratively. The Filter transform also allows you to register filter > > functions that operate on multiple fields at the same time. > > > > > > Select > > > > Let’s say
Re: Beam Schemas: current status
Thanks Reuven for the excellent summary and thanks to all the guys who worked in the Schema/SQL improvements. This is great for usability. I really like the idea of making user experience simpler, e.g. by automatically inferring Coders. Some questions: - Any plans to add similar improvements for the python/go SDKs ? - I suppose that deconstructing the element objects based on annotations happens in flight, have you thought about a way to eventually push this into the previous transform (e.g. the IO) via some sort of push-down predicate ? (I suppose this applies for jsonpath filters but should be more complex) - Do you think it makes sense to have ways for IOs to provide transforms to convert from/to Rows. I remember there was some work on this for the SQL. I am wondering how we can make the Schema/SQL experience even more friendly. - What is the current intermediate representation, if I remember well weren’t Coders in part a way of hacking around the issues of determinism in Java Serialization. So if we use Java serialization (generated via bytebuddy) wouldn’t we have similar issues? - Have you envisioned other ways to serialize apart of the generation via bytebuddy ? e.g. to make Row compatible with formats supported in multiple languages e.g. protobuf, or some arrow-like thing that we can just submit without reserialization and can be decoded back (this will be great for portability). - Any discussions on row ↔ json conversion? Sounds like a trivial / common case too Regards, Ismael On Thu, Aug 30, 2018 at 4:51 PM Reuven Lax wrote: > > Andrew - the @Experimental tag simply means that we are free to change the > interfaces without waiting for the next major Beam version. Once we are happy > to freeze these interfaces, we can drop the tag. > > On Wed, Aug 29, 2018 at 1:48 PM Andrew Pilloud wrote: >> >> The work you've done to generalize and expand Schemas has significantly >> simplified what we need to do for SQL, I hope they are valuable to everyone >> else too. What work remains before we can drop the Experimental designation? >> >> Andrew >> >> On Wed, Aug 29, 2018 at 5:31 AM Eugene Kirpichov >> wrote: >>> >>> Wow, this is really coming together, congratulations and thanks for the >>> great work! >>> >>> On Wed, Aug 29, 2018 at 1:40 AM Reuven Lax wrote: I wanted to send a quick note to the community about the current status of schema-aware PCollections in Beam. As some might remember we had a good discussion last year about the design of these schemas, involving many folks from different parts of the community. I sent a summary earlier this year explaining how schemas has been integrated into the DoFn framework. Much has happened since then, and here are some of the highlights. First, I want to emphasize that all the schema-aware classes are currently marked @Experimental. Nothing is set in stone yet, so if you have questions about any decisions made, please start a discussion! SQL The first big milestone for schemas was porting all of BeamSQL to use the framework, which was done in pr/5956. This was a lot of work, exposed many bugs in the schema implementation, but now provides great evidence that schemas work! Schema inference Beam can automatically infer schemas from Java POJOs (objects with public fields) or JavaBean objects (objects with getter/setter methods). Often you can do this by simply annotating the class. For example: @DefaultSchema(JavaFieldSchema.class) public class UserEvent { public String userId; public LatLong location; Public String countryCode; public long transactionCost; public double transactionDuration; public List traceMessages; }; @DefaultSchema(JavaFieldSchema.class) public class LatLong { public double latitude; public double longitude; } Beam will automatically infer schemas for these classes! So if you have a PCollection, it will automatically get the following schema: UserEvent: userId: STRING location: ROW(LatLong) countryCode: STRING transactionCost: INT64 transactionDuration: DOUBLE traceMessages: ARRAY[STRING]] LatLong: latitude: DOUBLE longitude: DOUBLE Now it’s not always possible to annotate the class like this (you may not own the class definition), so you can also explicitly register this using Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans. Coders Beam has a built-in coder for any schema-aware PCollection, largely removing the need for users to care about coders. We generate low-level bytecode (using ByteBuddy) to implement
Re: Beam Schemas: current status
Andrew - the @Experimental tag simply means that we are free to change the interfaces without waiting for the next major Beam version. Once we are happy to freeze these interfaces, we can drop the tag. On Wed, Aug 29, 2018 at 1:48 PM Andrew Pilloud wrote: > The work you've done to generalize and expand Schemas has significantly > simplified what we need to do for SQL, I hope they are valuable to everyone > else too. What work remains before we can drop the Experimental designation? > > Andrew > > On Wed, Aug 29, 2018 at 5:31 AM Eugene Kirpichov > wrote: > >> Wow, this is really coming together, congratulations and thanks for the >> great work! >> >> On Wed, Aug 29, 2018 at 1:40 AM Reuven Lax wrote: >> >>> I wanted to send a quick note to the community about the current status >>> of schema-aware PCollections in Beam. As some might remember we had a good >>> discussion last year about the design of these schemas, involving many >>> folks from different parts of the community. I sent a summary earlier this >>> year explaining how schemas has been integrated into the DoFn framework. >>> Much has happened since then, and here are some of the highlights. >>> >>> First, I want to emphasize that all the schema-aware classes are >>> currently marked @Experimental. Nothing is set in stone yet, so if you have >>> questions about any decisions made, please start a discussion! >>> >>> SQL >>> >>> The first big milestone for schemas was porting all of BeamSQL to use >>> the framework, which was done in pr/5956. This was a lot of work, exposed >>> many bugs in the schema implementation, but now provides great evidence >>> that schemas work! >>> >>> Schema inference >>> >>> Beam can automatically infer schemas from Java POJOs (objects with >>> public fields) or JavaBean objects (objects with getter/setter methods). >>> Often you can do this by simply annotating the class. For example: >>> >>> @DefaultSchema(JavaFieldSchema.class) >>> >>> public class UserEvent { >>> >>> public String userId; >>> >>> public LatLong location; >>> >>> Public String countryCode; >>> >>> public long transactionCost; >>> >>> public double transactionDuration; >>> >>> public List traceMessages; >>> >>> }; >>> >>> @DefaultSchema(JavaFieldSchema.class) >>> >>> public class LatLong { >>> >>> public double latitude; >>> >>> public double longitude; >>> >>> } >>> >>> Beam will automatically infer schemas for these classes! So if you have >>> a PCollection, it will automatically get the following schema: >>> >>> UserEvent: >>> >>> userId: STRING >>> >>> location: ROW(LatLong) >>> >>> countryCode: STRING >>> >>> transactionCost: INT64 >>> >>> transactionDuration: DOUBLE >>> >>> traceMessages: ARRAY[STRING]] >>> >>> >>> LatLong: >>> >>> latitude: DOUBLE >>> >>> longitude: DOUBLE >>> >>> Now it’s not always possible to annotate the class like this (you may >>> not own the class definition), so you can also explicitly register this >>> using Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans. >>> >>> Coders >>> >>> Beam has a built-in coder for any schema-aware PCollection, largely >>> removing the need for users to care about coders. We generate low-level >>> bytecode (using ByteBuddy) to implement the coder for each schema, so these >>> coders are quite performant. This provides a better default coder for Java >>> POJO objects as well. In the past users were recommended to use AvroCoder >>> for pojos, which many have found inefficient. Now there’s a more-efficient >>> solution. >>> >>> Utility Transforms >>> >>> Schemas are already useful for implementers of extensions such as SQL, >>> but the goal was to use them to make Beam itself easier to use. To this >>> end, I’ve been implementing a library of transforms that allow for easy >>> manipulation of schema PCollections. So far Filter and Select are merged, >>> Group is about to go out for review (it needs some more javadoc and unit >>> tests), and Join is being developed but doesn’t yet have a final interface. >>> >>> Filter >>> >>> Given a PCollection, I want to keep only those in an area of >>> southern manhattan. Well this is easy! >>> >>> PCollection manhattanEvents = allEvents.apply(Filter >>> >>> .whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699) >>> >>> .whereFieldName("longitude", long -> long < -73.969 && long > -74.747 >>> )); >>> >>> Schemas along with lambdas allows us to write this transform >>> declaratively. The Filter transform also allows you to register filter >>> functions that operate on multiple fields at the same time. >>> >>> Select >>> >>> Let’s say that I don’t need all the fields in a row. For instance, I’m >>> only interested in the userId and traceMessages, and don’t care about the >>> location. In that case I can write the following: >>> >>> PCollection selected = allEvents.apply(Select.fieldNames(“userId”, >>> “traceMessages”)); >>> >>> >>> BTW, Beam also keeps track of which fields are accessed by a transform >>> In
Re: Beam Schemas: current status
Nice work Reuven!!! On Thu, Aug 30, 2018 at 6:57 AM Jean-Baptiste Onofré wrote: > Nice feature, thanks Reuven ! > > I started to revamp the Spark runner with dataset, I will leverage this ! > > Regards > JB > > On 29/08/2018 07:40, Reuven Lax wrote: > > I wanted to send a quick note to the community about the current status > > of schema-aware PCollections in Beam. As some might remember we had a > > good discussion last year about the design of these schemas, involving > > many folks from different parts of the community. I sent a summary > > earlier this year explaining how schemas has been integrated into the > > DoFn framework. Much has happened since then, and here are some of the > > highlights. > > > > > > First, I want to emphasize that all the schema-aware classes are > > currently marked @Experimental. Nothing is set in stone yet, so if you > > have questions about any decisions made, please start a discussion! > > > > > > SQL > > > > The first big milestone for schemas was porting all of BeamSQL to use > > the framework, which was done in pr/5956. This was a lot of work, > > exposed many bugs in the schema implementation, but now provides great > > evidence that schemas work! > > > > > > Schema inference > > > > Beam can automatically infer schemas from Java POJOs (objects with > > public fields) or JavaBean objects (objects with getter/setter methods). > > Often you can do this by simply annotating the class. For example: > > > > > > @DefaultSchema(JavaFieldSchema.class) > > > > publicclassUserEvent{ > > > > publicStringuserId; > > > > publicLatLonglocation; > > > > PublicStringcountryCode; > > > > publiclongtransactionCost; > > > > publicdoubletransactionDuration; > > > > publicListtraceMessages; > > > > }; > > > > > > @DefaultSchema(JavaFieldSchema.class) > > > > publicclassLatLong{ > > > > publicdoublelatitude; > > > > publicdoublelongitude; > > > > } > > > > > > Beam will automatically infer schemas for these classes! So if you have > > a PCollection, it will automatically get the following schema: > > > > > > UserEvent: > > > > userId: STRING > > > > location: ROW(LatLong) > > > > countryCode: STRING > > > > transactionCost: INT64 > > > > transactionDuration: DOUBLE > > > > traceMessages: ARRAY[STRING]] > > > > > > LatLong: > > > > latitude: DOUBLE > > > > longitude: DOUBLE > > > > > > Now it’s not always possible to annotate the class like this (you may > > not own the class definition), so you can also explicitly register this > > using Pipeline:getSchemaRegistry:registerPOJO, and the same for > JavaBeans. > > > > > > Coders > > > > Beam has a built-in coder for any schema-aware PCollection, largely > > removing the need for users to care about coders. We generate low-level > > bytecode (using ByteBuddy) to implement the coder for each schema, so > > these coders are quite performant. This provides a better default coder > > for Java POJO objects as well. In the past users were recommended to use > > AvroCoder for pojos, which many have found inefficient. Now there’s a > > more-efficient solution. > > > > > > Utility Transforms > > > > Schemas are already useful for implementers of extensions such as SQL, > > but the goal was to use them to make Beam itself easier to use. To this > > end, I’ve been implementing a library of transforms that allow for easy > > manipulation of schema PCollections. So far Filter and Select are > > merged, Group is about to go out for review (it needs some more javadoc > > and unit tests), and Join is being developed but doesn’t yet have a > > final interface. > > > > > > Filter > > > > Given a PCollection, I want to keep only those in an area of > > southern manhattan. Well this is easy! > > > > > > PCollectionmanhattanEvents =allEvents.apply(Filter > > > > .whereFieldName("latitude",lat ->lat <40.720& >40.699) > > > > .whereFieldName("longitude",long->long<-73.969&>-74.747)); > > > > > > Schemas along with lambdas allows us to write this transform > > declaratively. The Filter transform also allows you to register filter > > functions that operate on multiple fields at the same time. > > > > > > Select > > > > Let’s say that I don’t need all the fields in a row. For instance, I’m > > only interested in the userId and traceMessages, and don’t care about > > the location. In that case I can write the following: > > > > > > PCollectionselected > > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”)); > > > > > > BTW, Beam also keeps track of which fields are accessed by a transform > > In the future we can automatically insert Selects in front of subgraphs > > to drop fields that are not referenced in that subgraph. > > > > > > Group > > > > Group is one of the more advanced transforms. In its most basic form, it > > provides a convenient way to group by key: > > > > > > PCollection>byUserAndCountry = > > > >allEvents.apply(Group.byFieldNames(“userId”,“countryCode”)); > > > > > > Notice how much more
Re: [Proposal] Creating a reproducible environment for Beam Jenkins Tests
Hi, That's interesting, however, it's really important to still be able to easily run test locally, without any VM/Docker required. It should be activated by profile or so. Regards JB On 27/08/2018 19:53, Yifan Zou wrote: > Hi, > > I have a proposal for creating a reproducible environment for Jenkins > tests by using docker container. The thing is, the environment > configurations on Beam Jenkins slaves are sometimes different from > developer's machines. Test failures on Jenkins may not be easy to > reproduce locally. Also, it is not convenient for developers to add or > modify underlying tools installed on Jenkins VMs, since they're managed > by Apache Infra. This proposal is aimed to address those problems. > > https://docs.google.com/document/d/1y0YuQj_oZXC0uM5-gniG7r9-5gv2uiDhzbtgYYJW48c/edit#heading=h.bg2yi0wbhl9n > > Any comments are welcome. Thank you. > > Regards. > Yifan > -- Jean-Baptiste Onofré jbono...@apache.org http://blog.nanthrax.net Talend - http://www.talend.com
Re: Beam Schemas: current status
Nice feature, thanks Reuven ! I started to revamp the Spark runner with dataset, I will leverage this ! Regards JB On 29/08/2018 07:40, Reuven Lax wrote: > I wanted to send a quick note to the community about the current status > of schema-aware PCollections in Beam. As some might remember we had a > good discussion last year about the design of these schemas, involving > many folks from different parts of the community. I sent a summary > earlier this year explaining how schemas has been integrated into the > DoFn framework. Much has happened since then, and here are some of the > highlights. > > > First, I want to emphasize that all the schema-aware classes are > currently marked @Experimental. Nothing is set in stone yet, so if you > have questions about any decisions made, please start a discussion! > > > SQL > > The first big milestone for schemas was porting all of BeamSQL to use > the framework, which was done in pr/5956. This was a lot of work, > exposed many bugs in the schema implementation, but now provides great > evidence that schemas work! > > > Schema inference > > Beam can automatically infer schemas from Java POJOs (objects with > public fields) or JavaBean objects (objects with getter/setter methods). > Often you can do this by simply annotating the class. For example: > > > @DefaultSchema(JavaFieldSchema.class) > > publicclassUserEvent{ > > publicStringuserId; > > publicLatLonglocation; > > PublicStringcountryCode; > > publiclongtransactionCost; > > publicdoubletransactionDuration; > > publicListtraceMessages; > > }; > > > @DefaultSchema(JavaFieldSchema.class) > > publicclassLatLong{ > > publicdoublelatitude; > > publicdoublelongitude; > > } > > > Beam will automatically infer schemas for these classes! So if you have > a PCollection, it will automatically get the following schema: > > > UserEvent: > > userId: STRING > > location: ROW(LatLong) > > countryCode: STRING > > transactionCost: INT64 > > transactionDuration: DOUBLE > > traceMessages: ARRAY[STRING]] > > > LatLong: > > latitude: DOUBLE > > longitude: DOUBLE > > > Now it’s not always possible to annotate the class like this (you may > not own the class definition), so you can also explicitly register this > using Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans. > > > Coders > > Beam has a built-in coder for any schema-aware PCollection, largely > removing the need for users to care about coders. We generate low-level > bytecode (using ByteBuddy) to implement the coder for each schema, so > these coders are quite performant. This provides a better default coder > for Java POJO objects as well. In the past users were recommended to use > AvroCoder for pojos, which many have found inefficient. Now there’s a > more-efficient solution. > > > Utility Transforms > > Schemas are already useful for implementers of extensions such as SQL, > but the goal was to use them to make Beam itself easier to use. To this > end, I’ve been implementing a library of transforms that allow for easy > manipulation of schema PCollections. So far Filter and Select are > merged, Group is about to go out for review (it needs some more javadoc > and unit tests), and Join is being developed but doesn’t yet have a > final interface. > > > Filter > > Given a PCollection, I want to keep only those in an area of > southern manhattan. Well this is easy! > > > PCollectionmanhattanEvents =allEvents.apply(Filter > > .whereFieldName("latitude",lat ->lat <40.720& >40.699) > > .whereFieldName("longitude",long->long<-73.969&>-74.747)); > > > Schemas along with lambdas allows us to write this transform > declaratively. The Filter transform also allows you to register filter > functions that operate on multiple fields at the same time. > > > Select > > Let’s say that I don’t need all the fields in a row. For instance, I’m > only interested in the userId and traceMessages, and don’t care about > the location. In that case I can write the following: > > > PCollectionselected > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”)); > > > BTW, Beam also keeps track of which fields are accessed by a transform > In the future we can automatically insert Selects in front of subgraphs > to drop fields that are not referenced in that subgraph. > > > Group > > Group is one of the more advanced transforms. In its most basic form, it > provides a convenient way to group by key: > > > PCollection>byUserAndCountry = > > allEvents.apply(Group.byFieldNames(“userId”,“countryCode”)); > > > Notice how much more concise this is than using GroupByKey directly! > > > The Group transform really starts to shine however when you start > specifying aggregations. You can aggregate any field (or fields) and > build up an output schema based on these aggregations. For example: > > > PCollection>aggregated =allEvents.apply( > >
Re: Beam Schemas: current status
That's a cool feature. Are there any limitations for the schema inference apart from being a Pojo/Bean? Does it supported nested PoJos, e.g. "wrapper.field"? -Max On 29.08.18 07:40, Reuven Lax wrote: I wanted to send a quick note to the community about the current status of schema-aware PCollections in Beam. As some might remember we had a good discussion last year about the design of these schemas, involving many folks from different parts of the community. I sent a summary earlier this year explaining how schemas has been integrated into the DoFn framework. Much has happened since then, and here are some of the highlights. First, I want to emphasize that all the schema-aware classes are currently marked @Experimental. Nothing is set in stone yet, so if you have questions about any decisions made, please start a discussion! SQL The first big milestone for schemas was porting all of BeamSQL to use the framework, which was done in pr/5956. This was a lot of work, exposed many bugs in the schema implementation, but now provides great evidence that schemas work! Schema inference Beam can automatically infer schemas from Java POJOs (objects with public fields) or JavaBean objects (objects with getter/setter methods). Often you can do this by simply annotating the class. For example: @DefaultSchema(JavaFieldSchema.class) publicclassUserEvent{ publicStringuserId; publicLatLonglocation; PublicStringcountryCode; publiclongtransactionCost; publicdoubletransactionDuration; publicListtraceMessages; }; @DefaultSchema(JavaFieldSchema.class) publicclassLatLong{ publicdoublelatitude; publicdoublelongitude; } Beam will automatically infer schemas for these classes! So if you have a PCollection, it will automatically get the following schema: UserEvent: userId: STRING location: ROW(LatLong) countryCode: STRING transactionCost: INT64 transactionDuration: DOUBLE traceMessages: ARRAY[STRING]] LatLong: latitude: DOUBLE longitude: DOUBLE Now it’s not always possible to annotate the class like this (you may not own the class definition), so you can also explicitly register this using Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans. Coders Beam has a built-in coder for any schema-aware PCollection, largely removing the need for users to care about coders. We generate low-level bytecode (using ByteBuddy) to implement the coder for each schema, so these coders are quite performant. This provides a better default coder for Java POJO objects as well. In the past users were recommended to use AvroCoder for pojos, which many have found inefficient. Now there’s a more-efficient solution. Utility Transforms Schemas are already useful for implementers of extensions such as SQL, but the goal was to use them to make Beam itself easier to use. To this end, I’ve been implementing a library of transforms that allow for easy manipulation of schema PCollections. So far Filter and Select are merged, Group is about to go out for review (it needs some more javadoc and unit tests), and Join is being developed but doesn’t yet have a final interface. Filter Given a PCollection, I want to keep only those in an area of southern manhattan. Well this is easy! PCollectionmanhattanEvents =allEvents.apply(Filter .whereFieldName("latitude",lat ->lat <40.720& >40.699) .whereFieldName("longitude",long->long<-73.969&>-74.747)); Schemas along with lambdas allows us to write this transform declaratively. The Filter transform also allows you to register filter functions that operate on multiple fields at the same time. Select Let’s say that I don’t need all the fields in a row. For instance, I’m only interested in the userId and traceMessages, and don’t care about the location. In that case I can write the following: PCollectionselected =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”)); BTW, Beam also keeps track of which fields are accessed by a transform In the future we can automatically insert Selects in front of subgraphs to drop fields that are not referenced in that subgraph. Group Group is one of the more advanced transforms. In its most basic form, it provides a convenient way to group by key: PCollection>byUserAndCountry = allEvents.apply(Group.byFieldNames(“userId”,“countryCode”)); Notice how much more concise this is than using GroupByKey directly! The Group transform really starts to shine however when you start specifying aggregations. You can aggregate any field (or fields) and build up an output schema based on these aggregations. For example: PCollection>aggregated =allEvents.apply( Group.byFieldNames(“userId”,“countryCode”) .aggregateField("cost",Sum.ofLongs(),"total_cost") .aggregateField("cost",Top.largestFn(10),“top_purchases”) .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
Re: [Proposal] Creating a reproducible environment for Beam Jenkins Tests
Hi Yifan, Thanks for the proposal. I like the idea of unifying test environments via Docker. It would be great if we could still easily run tests without Docker. Best, Max On 27.08.18 19:53, Yifan Zou wrote: Hi, I have a proposal for creating a reproducible environment for Jenkins tests by using docker container. The thing is, the environment configurations on Beam Jenkins slaves are sometimes different from developer's machines. Test failures on Jenkins may not be easy to reproduce locally. Also, it is not convenient for developers to add or modify underlying tools installed on Jenkins VMs, since they're managed by Apache Infra. This proposal is aimed to address those problems. https://docs.google.com/document/d/1y0YuQj_oZXC0uM5-gniG7r9-5gv2uiDhzbtgYYJW48c/edit#heading=h.bg2yi0wbhl9n Any comments are welcome. Thank you. Regards. Yifan
Re: Accessing attempted metrics from within a DoFn
Robin, I asked myself the same thing, and indeed there is no way of accessing the metrics from within the pipeline itself. The only access you can have is directly to the MetricCell like that (1) but it is runner facing, it is not user facing, so not helping for your test.So I agree with you: such a job handler would be useful. [1] https://github.com/apache/beam/blob/6497b0b24888178441f05d79ab101dc40760388b/runners/core-java/src/main/java/org/apa che/beam/runners/core/LateDataUtils.java#L95 Best,Etienne Le mercredi 29 août 2018 à 10:43 -0700, Robin Qiu a écrit : > Hi everyone, > > I am writing a test [1] for the support of @RequiresStableInput annotation in > Java SDK [2]. In the test I need to have > a ParDo make some side effects and then fail. Currently, the only way to > persist the side effect during retries > without depending on external states is using attempted metrics. However, > attempted metrics cannot be accessed from > within a DoFn. So I am wondering if we should make an improvement on this by, > for example, exposing a job handler to > DoFn. > > As Luke mentioned in another email thread, this improvement will also be > useful since: > * cancelling a pipeline from within the pipeline is useful > * starting a new job against the existing runner from in a pipeline is useful > * accessing attempted metrics to test DoFn's with side effects is useful for > error handling testing > > What do you think? All suggestions are appreciated. > > Best, > Robin > > [1] https://github.com/apache/beam/pull/6220 > [2] > https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM/edit# > >
[QUESTION] retrial in sources by the runners
Hi all, I have a question concerning retrial of sources. I've looked at the code of direct runner and spark runner on bounded sources. As far as I can tell, if there is a failure in reading a record from the reader of the source, there will be no retrial from the runner, there will just be an exception thrown. The only retrial that can take place with the source is the one that could be done by the reader itself. Can you confirm that I did not miss something? Etienne
Re: Beam Schemas: current status
Very impressive, thanks for your work Reuven ! Etienne Le mardi 28 août 2018 à 22:40 -0700, Reuven Lax a écrit : > I wanted to send a quick note to the community about the current status of > schema-aware PCollections in Beam. As some > might remember we had a good discussion last year about the design of these > schemas, involving many folks from > different parts of the community. I sent a summary earlier this year > explaining how schemas has been integrated into > the DoFn framework. Much has happened since then, and here are some of the > highlights. > First, I want to emphasize that all the schema-aware classes are currently > marked @Experimental. Nothing is set in > stone yet, so if you have questions about any decisions made, please start a > discussion! > SQLThe first big milestone for schemas was porting all of BeamSQL to use the > framework, which was done in pr/5956. > This was a lot of work, exposed many bugs in the schema implementation, but > now provides great evidence that schemas > work! > Schema inferenceBeam can automatically infer schemas from Java POJOs (objects > with public fields) or JavaBean objects > (objects with getter/setter methods). Often you can do this by simply > annotating the class. For example: > @DefaultSchema(JavaFieldSchema.class)public class UserEvent { public String > userId; public LatLong location; Public > String countryCode; public long transactionCost; public double > transactionDuration; public List > traceMessages;}; > @DefaultSchema(JavaFieldSchema.class)public class LatLong { public double > latitude; public double longitude;} > Beam will automatically infer schemas for these classes! So if you have a > PCollection, it will > automatically get the following schema: > UserEvent: userId: STRING location: ROW(LatLong) countryCode: STRING > transactionCost: INT64 transactionDuration: > DOUBLE traceMessages: ARRAY[STRING]] > LatLong: latitude: DOUBLE longitude: DOUBLE > Now it’s not always possible to annotate the class like this (you may not own > the class definition), so you can also > explicitly register this using Pipeline:getSchemaRegistry:registerPOJO, and > the same for JavaBeans. > CodersBeam has a built-in coder for any schema-aware PCollection, largely > removing the need for users to care about > coders. We generate low-level bytecode (using ByteBuddy) to implement the > coder for each schema, so these coders are > quite performant. This provides a better default coder for Java POJO objects > as well. In the past users were > recommended to use AvroCoder for pojos, which many have found inefficient. > Now there’s a more-efficient solution. > Utility TransformsSchemas are already useful for implementers of extensions > such as SQL, but the goal was to use them > to make Beam itself easier to use. To this end, I’ve been implementing a > library of transforms that allow for easy > manipulation of schema PCollections. So far Filter and Select are merged, > Group is about to go out for review (it > needs some more javadoc and unit tests), and Join is being developed but > doesn’t yet have a final interface. > FilterGiven a PCollection, I want to keep only those in an area of > southern manhattan. Well this is easy! > PCollection manhattanEvents = allEvents.apply(Filter > .whereFieldName("latitude", lat -> lat < 40.720 && lat > > 40.699) .whereFieldName("longitude", long -> long < -73.969 && long > > > -74.747)); > Schemas along with lambdas allows us to write this transform declaratively. > The Filter transform also allows you to > register filter functions that operate on multiple fields at the same time. > SelectLet’s say that I don’t need all the fields in a row. For instance, I’m > only interested in the userId and > traceMessages, and don’t care about the location. In that case I can write > the following: > PCollection selected = allEvents.apply(Select.fieldNames(“userId”, > “traceMessages”)); > BTW, Beam also keeps track of which fields are accessed by a transform In the > future we can automatically insert > Selects in front of subgraphs to drop fields that are not referenced in that > subgraph. > GroupGroup is one of the more advanced transforms. In its most basic form, it > provides a convenient way to group by > key: > PCollection> byUserAndCountry = > allEvents.apply(Group.byFieldNames(“userId”, > “countryCode”)); > Notice how much more concise this is than using GroupByKey directly! > The Group transform really starts to shine however when you start specifying > aggregations. You can aggregate any field > (or fields) and build up an output schema based on these aggregations. For > example: > PCollection> aggregated = allEvents.apply( > Group.byFieldNames(“userId”, “countryCode”) >.aggregateField("cost", Sum.ofLongs(), "total_cost") > .aggregateField("cost", Top.largestFn(10), > “top_purchases”).aggregateField("transationDuration", >
Re: delayed emit (timer) in py-beam?
FYI: the reference DirectRunner implementation of the Python user state and timers API is out for review: https://github.com/apache/beam/pull/6304 On Mon, Jul 30, 2018 at 3:57 PM Austin Bennett wrote: > Fantastic; thanks, Charles! > > > > On Mon, Jul 30, 2018 at 3:49 PM, Charles Chen wrote: > >> Hey Austin, >> >> This API is not yet implemented in the Python SDK. I am working on this >> feature: the next step from my end is to finish a reference implementation >> in the local DirectRunner. As you note, the doc at >> https://s.apache.org/beam-python-user-state-and-timers describes the >> design. >> >> You can track progress on the mailing list thread here: >> https://lists.apache.org/thread.html/51ba1a00027ad8635bc1d2c0df805ce873995170c75d6a08dfe21997@%3Cdev.beam.apache.org%3E >> >> Best, >> Charles >> >> On Mon, Jul 30, 2018 at 3:34 PM Austin Bennett < >> whatwouldausti...@gmail.com> wrote: >> >>> What's going on with timers and python? >>> >>> Am looking at building a pipeline (assuming another group in my company >>> will grant access to the Kafka topic): >>> >>> Kafka -> beam -> have beam wait 24 hours -> do transform(s) and emit a >>> record. If I read things correctly that's not currently possible in python >>> on beam. What all is needed? (trying to figure out whether that is >>> something that I am capable of and there is room for me to help with). >>> Looking for similar functionality to >>> https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/ >>> (though don't need alternate routing, nor is that example in python). >>> >>> >>> For example, I see: >>> https://beam.apache.org/blog/2017/08/28/timely-processing.html >>> >>> and tickets like: https://issues.apache.org/jira/browse/BEAM-4594 >>> >>> >>> >
Re: Proposal for Beam Python User State and Timer APIs
Another update: the reference DirectRunner implementation of the Python user state and timers API is out for review: https://github.com/apache/beam/pull/6304 On Mon, Jul 9, 2018 at 2:18 PM Charles Chen wrote: > An update: https://github.com/apache/beam/pull/5691 has been merged. I > hope to send out a reference implementation in the DirectRunner soon. On > the roadmap after that is work on the relevant portability interfaces here > so we can get this working on runners like Beam Python on Flink. > > On Wed, Jun 20, 2018 at 10:00 AM Charles Chen wrote: > >> An update on the implementation: I recently sent out the user-facing >> pipeline construction part of the API implementation out for review: >> https://github.com/apache/beam/pull/5691. >> >> On Tue, Jun 5, 2018 at 5:26 PM Charles Chen wrote: >> >>> Thanks everyone for contributing here. We've reached rough consensus on >>> the approach we should take with this API, and I've summarized this in the >>> new "Community consensus" sections I added to the doc ( >>> https://s.apache.org/beam-python-user-state-and-timers). I will begin >>> initial implementation of this API soon. >>> >>> On Wed, May 23, 2018 at 8:08 PM Thomas Weise wrote: >>> Nice proposal; it's exciting to see this about to be added to the SDK as it enables a set of more complex use cases. I also think that some of the content can later be repurposed as user documentation. Thanks, Thomas On Wed, May 23, 2018 at 11:49 AM, Charles Chen wrote: > Thanks everyone for the detailed comments and discussions. It looks > like by now, we mostly agree with the requirements and overall direction > needed for the API, though there is continuing discussion on specific > details. I want to highlight two new sections of the doc, which address > some discussions that have come up: > >- *Existing state and transactionality*: this section addresses >how we will address an existing transactionality inconsistency in the >existing Java API. ( > > https://docs.google.com/document/d/1GadEkAmtbJQjmqiqfSzGw3b66TKerm8tyn6TK4blAys/edit#heading=h.ofyl9jspiz3b >) >- *State for merging windows*: this section addresses how we will >deal with non-combinable state in conjunction with merging windows. ( > > https://docs.google.com/document/d/1GadEkAmtbJQjmqiqfSzGw3b66TKerm8tyn6TK4blAys/edit#heading=h.ctxkcgabtzpy >) > > Let me know any further comments and suggestions. > > On Tue, May 22, 2018 at 9:29 AM Kenneth Knowles > wrote: > >> Nice. I know that Java users have found it helpful to have this >> lower-level way of writing pipelines when the high-level primitives don't >> quite have the tight control they are looking for. I hope it will be a >> big >> draw for Python, too. >> >> (commenting on the doc) >> >> Kenn >> >> On Mon, May 21, 2018 at 5:15 PM Charles Chen wrote: >> >>> I want to share a proposal for adding user state and timer support >>> to the Beam Python SDK and get the community's thoughts on how such an >>> API >>> should look: https://s.apache.org/beam-python-user-state-and-timers >>> >>> Let me know what you think and please add any comments and >>> suggestions you may have. >>> >>> Best, >>> Charles >>> >>
[BEAM-960] Backoff in the DirectRunner if no work is available
Hi beamers, I would like to contribute fix for the following issue: - https://issues.apache.org/jira/browse/BEAM-690 The corresponding PR: - https://github.com/apache/beam/pull/6303 I tried to follow the approach suggested in the comments of the said ticket and any feedback is appreciated. This is my first attempt to contribute to Beam code, so please bear with me if I missed something important. To give you some background on why I'm fixing this: I understand that performance related issues in direct runner generally receive low priority as the whole direct runner concept is not meant for a production deployment. Yet I think that this issue should receive reasonable attention because even if being used in testing/CI pipelines, the increased CPU consumption may materialise in a form of higher bill from your favourite cloud provider. This definitely is our case and it makes this issue a high priority one for us. Regards, Vojta
Gradle tests parallelization
Hi everyone, To fix flaky tests, I was wondering what the current tests parallelization of the gradle build is. Luke gave me the informations I needed. I have transcribed our conversation in this (1) wiki page so that it can profit to everyone. [1]: https://cwiki.apache.org/confluence/display/BEAM/Gradle+tests+parallelization Etienne