[jira] [Work logged] (BEAM-9384) Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types
[ https://issues.apache.org/jira/browse/BEAM-9384?focusedWorklogId=394012=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-394012 ] ASF GitHub Bot logged work on BEAM-9384: Author: ASF GitHub Bot Created on: 27/Feb/20 07:56 Start Date: 27/Feb/20 07:56 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10974: [BEAM-9384] Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types URL: https://github.com/apache/beam/pull/10974#issuecomment-591833469 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 394012) Time Spent: 1h 10m (was: 1h) > Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types > -- > > Key: BEAM-9384 > URL: https://issues.apache.org/jira/browse/BEAM-9384 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Ismaël Mejía >Assignee: Ismaël Mejía >Priority: Minor > Time Spent: 1h 10m > Remaining Estimate: 0h > > PTransforms that are parameterized by types may be able to infer a valid > SchemaCoder for a given type from the SchemaRegistry (if a Schema for the > given type is available). This method will provide a unified place to do that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9384) Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types
[ https://issues.apache.org/jira/browse/BEAM-9384?focusedWorklogId=394013=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-394013 ] ASF GitHub Bot logged work on BEAM-9384: Author: ASF GitHub Bot Created on: 27/Feb/20 07:56 Start Date: 27/Feb/20 07:56 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10974: [BEAM-9384] Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types URL: https://github.com/apache/beam/pull/10974#issuecomment-591833541 Run JavaPortabilityApi PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 394013) Time Spent: 1h 20m (was: 1h 10m) > Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types > -- > > Key: BEAM-9384 > URL: https://issues.apache.org/jira/browse/BEAM-9384 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Ismaël Mejía >Assignee: Ismaël Mejía >Priority: Minor > Time Spent: 1h 20m > Remaining Estimate: 0h > > PTransforms that are parameterized by types may be able to infer a valid > SchemaCoder for a given type from the SchemaRegistry (if a Schema for the > given type is available). This method will provide a unified place to do that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9384) Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types
[ https://issues.apache.org/jira/browse/BEAM-9384?focusedWorklogId=394008=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-394008 ] ASF GitHub Bot logged work on BEAM-9384: Author: ASF GitHub Bot Created on: 27/Feb/20 07:52 Start Date: 27/Feb/20 07:52 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #10974: [BEAM-9384] Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types URL: https://github.com/apache/beam/pull/10974#discussion_r384960299 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java ## @@ -288,6 +281,38 @@ public void registerSchemaProvider(SchemaProvider schemaProvider) { return getProviderResult((SchemaProvider p) -> p.fromRowFunction(typeDescriptor)); } + /** + * Retrieve a {@link SchemaCoder} for a given {@link Class} type. If no schema exists, throws + * {@link * NoSuchSchemaException}. + */ + public SchemaCoder getSchemaCoder(Class clazz) throws NoSuchSchemaException { +return getSchemaCoder(TypeDescriptor.of(clazz)); + } + + /** + * Retrieve a {@link SchemaCoder} for a given {@link TypeDescriptor} type. If no schema exists, + * throws {@link * NoSuchSchemaException}. + */ + public SchemaCoder getSchemaCoder(TypeDescriptor typeDescriptor) + throws NoSuchSchemaException { +return SchemaCoder.of( +getSchema(typeDescriptor), +typeDescriptor, +getToRowFunction(typeDescriptor), +getFromRowFunction(typeDescriptor)); Review comment: I agree with you in the fact that `SchemaCoder` is an internal detail that regular users (authors of Pipelines) should not care about. I felt tempted to mark this method as `@Internal` however PTransform authors (e.g. IO authors) will find this useful (as I did for the PR I mention above for KafkaIO schema support), so probably worth to let it available, also I cannot think of a better place to put this method than here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 394008) Time Spent: 1h (was: 50m) > Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types > -- > > Key: BEAM-9384 > URL: https://issues.apache.org/jira/browse/BEAM-9384 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Ismaël Mejía >Assignee: Ismaël Mejía >Priority: Minor > Time Spent: 1h > Remaining Estimate: 0h > > PTransforms that are parameterized by types may be able to infer a valid > SchemaCoder for a given type from the SchemaRegistry (if a Schema for the > given type is available). This method will provide a unified place to do that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9258) [Python] PTransform that connects to Cloud DLP deidentification service
[ https://issues.apache.org/jira/browse/BEAM-9258?focusedWorklogId=394007=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-394007 ] ASF GitHub Bot logged work on BEAM-9258: Author: ASF GitHub Bot Created on: 27/Feb/20 07:49 Start Date: 27/Feb/20 07:49 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10961: [BEAM-9258] Add integration test for Cloud DLP URL: https://github.com/apache/beam/pull/10961#issuecomment-591830855 R: @aaltay cc: @kamilwu This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 394007) Time Spent: 3h 50m (was: 3h 40m) > [Python] PTransform that connects to Cloud DLP deidentification service > --- > > Key: BEAM-9258 > URL: https://issues.apache.org/jira/browse/BEAM-9258 > Project: Beam > Issue Type: Sub-task > Components: io-py-gcp >Reporter: Michał Walenia >Assignee: Michał Walenia >Priority: Major > Fix For: 2.20.0 > > Time Spent: 3h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9295) Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10
[ https://issues.apache.org/jira/browse/BEAM-9295?focusedWorklogId=394006=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-394006 ] ASF GitHub Bot logged work on BEAM-9295: Author: ASF GitHub Bot Created on: 27/Feb/20 07:48 Start Date: 27/Feb/20 07:48 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10945: [BEAM-9295] Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10 URL: https://github.com/apache/beam/pull/10945#issuecomment-591809987 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 394006) Time Spent: 2h 20m (was: 2h 10m) > Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10 > --- > > Key: BEAM-9295 > URL: https://issues.apache.org/jira/browse/BEAM-9295 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > Apache Flink 1.10 has completed the final release vote, see [1]. So, I would > like to add Flink 1.10 build target and make Flink Runner compatible with > Flink 1.10. > And I appreciate it if you can leave your suggestions or comments! > [1] > https://lists.apache.org/thread.html/r97672d4d1e47372cebf23e6643a6cc30a06bfbdf3f277b0be3695b15%40%3Cdev.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9394) DynamicMessage handling of empty map violates schema nullability
[ https://issues.apache.org/jira/browse/BEAM-9394?focusedWorklogId=394004=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-394004 ] ASF GitHub Bot logged work on BEAM-9394: Author: ASF GitHub Bot Created on: 27/Feb/20 07:47 Start Date: 27/Feb/20 07:47 Worklog Time Spent: 10m Work Description: alexvanboxel commented on pull request #10984: [BEAM-9394] DynamicMessage handling of empty map violates schema null… URL: https://github.com/apache/beam/pull/10984 DynamicMessage handling of empty map violates schema nullability Fixed the handling of empty maps. It runned NULL, but should return and emtpy map in the Row. Added tests for Maps and Array. Only Map had the incorrect behaviour. **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
[jira] [Updated] (BEAM-9395) Support Complex Types when converting HCatRecords to Rows
[ https://issues.apache.org/jira/browse/BEAM-9395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rahul Patwari updated BEAM-9395: Description: org.apache.beam.sdk.io.hcatalog.SchemaUtils.toBeamSchema() only supports converting Primitive types of HCatRecords to Rows. It can be enhanced to support complex types i.e. List, Map, Struct. > Support Complex Types when converting HCatRecords to Rows > - > > Key: BEAM-9395 > URL: https://issues.apache.org/jira/browse/BEAM-9395 > Project: Beam > Issue Type: Improvement > Components: io-java-hcatalog >Reporter: Rahul Patwari >Priority: Major > > org.apache.beam.sdk.io.hcatalog.SchemaUtils.toBeamSchema() only supports > converting Primitive types of HCatRecords to Rows. It can be enhanced to > support complex types i.e. List, Map, Struct. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9295) Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10
[ https://issues.apache.org/jira/browse/BEAM-9295?focusedWorklogId=394005=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-394005 ] ASF GitHub Bot logged work on BEAM-9295: Author: ASF GitHub Bot Created on: 27/Feb/20 07:47 Start Date: 27/Feb/20 07:47 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10945: [BEAM-9295] Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10 URL: https://github.com/apache/beam/pull/10945#issuecomment-591830495 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 394005) Time Spent: 2h 10m (was: 2h) > Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10 > --- > > Key: BEAM-9295 > URL: https://issues.apache.org/jira/browse/BEAM-9295 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > Apache Flink 1.10 has completed the final release vote, see [1]. So, I would > like to add Flink 1.10 build target and make Flink Runner compatible with > Flink 1.10. > And I appreciate it if you can leave your suggestions or comments! > [1] > https://lists.apache.org/thread.html/r97672d4d1e47372cebf23e6643a6cc30a06bfbdf3f277b0be3695b15%40%3Cdev.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9395) Support Complex Types when converting HCatRecords to Rows
[ https://issues.apache.org/jira/browse/BEAM-9395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rahul Patwari updated BEAM-9395: Priority: Minor (was: Major) > Support Complex Types when converting HCatRecords to Rows > - > > Key: BEAM-9395 > URL: https://issues.apache.org/jira/browse/BEAM-9395 > Project: Beam > Issue Type: Improvement > Components: io-java-hcatalog >Reporter: Rahul Patwari >Priority: Minor > > org.apache.beam.sdk.io.hcatalog.SchemaUtils.toBeamSchema() only supports > converting Primitive types of HCatRecords to Rows. It can be enhanced to > support complex types i.e. List, Map, Struct. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-9394) DynamicMessage handling of empty map violates schema nullability
[ https://issues.apache.org/jira/browse/BEAM-9394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alex Van Boxel resolved BEAM-9394. -- Resolution: Fixed > DynamicMessage handling of empty map violates schema nullability > > > Key: BEAM-9394 > URL: https://issues.apache.org/jira/browse/BEAM-9394 > Project: Beam > Issue Type: Bug > Components: extensions-java-protobuf >Reporter: Alex Van Boxel >Assignee: Alex Van Boxel >Priority: Major > Fix For: 2.20.0 > > > DynamicMessage handling of empty map violates nullability. It should return > an empty map at the Row level. > Add tests for nullable map and array to verify behaviour. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9394) DynamicMessage handling of empty map violates schema nullability
[ https://issues.apache.org/jira/browse/BEAM-9394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alex Van Boxel updated BEAM-9394: - Status: Open (was: Triage Needed) > DynamicMessage handling of empty map violates schema nullability > > > Key: BEAM-9394 > URL: https://issues.apache.org/jira/browse/BEAM-9394 > Project: Beam > Issue Type: Bug > Components: extensions-java-protobuf >Reporter: Alex Van Boxel >Assignee: Alex Van Boxel >Priority: Major > Fix For: 2.20.0 > > > DynamicMessage handling of empty map violates nullability. It should return > an empty map at the Row level. > Add tests for nullable map and array to verify behaviour. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9395) Support Complex Types when converting HCatRecords to Rows
Rahul Patwari created BEAM-9395: --- Summary: Support Complex Types when converting HCatRecords to Rows Key: BEAM-9395 URL: https://issues.apache.org/jira/browse/BEAM-9395 Project: Beam Issue Type: Improvement Components: io-java-hcatalog Reporter: Rahul Patwari -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9394) DynamicMessage handling of empty map violates schema nullability
Alex Van Boxel created BEAM-9394: Summary: DynamicMessage handling of empty map violates schema nullability Key: BEAM-9394 URL: https://issues.apache.org/jira/browse/BEAM-9394 Project: Beam Issue Type: Bug Components: extensions-java-protobuf Reporter: Alex Van Boxel Assignee: Alex Van Boxel Fix For: 2.20.0 DynamicMessage handling of empty map violates nullability. It should return an empty map at the Row level. Add tests for nullable map and array to verify behaviour. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9393) support schemas in state API
[ https://issues.apache.org/jira/browse/BEAM-9393?focusedWorklogId=393990=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393990 ] ASF GitHub Bot logged work on BEAM-9393: Author: ASF GitHub Bot Created on: 27/Feb/20 06:55 Start Date: 27/Feb/20 06:55 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10983: [BEAM-9393] Support schemas in state API URL: https://github.com/apache/beam/pull/10983 Add schema inference for types used in the state API. Add state overrides for Row types. Disable Coder inference for Row types, as we should only use Row with schemas. R: @dpmills This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393990) Remaining Estimate: 0h Time Spent: 10m > support schemas in state API > > > Key: BEAM-9393 > URL: https://issues.apache.org/jira/browse/BEAM-9393 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Reuven Lax >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9393) support schemas in state API
Reuven Lax created BEAM-9393: Summary: support schemas in state API Key: BEAM-9393 URL: https://issues.apache.org/jira/browse/BEAM-9393 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9295) Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10
[ https://issues.apache.org/jira/browse/BEAM-9295?focusedWorklogId=393988=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393988 ] ASF GitHub Bot logged work on BEAM-9295: Author: ASF GitHub Bot Created on: 27/Feb/20 06:44 Start Date: 27/Feb/20 06:44 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10945: [BEAM-9295] Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10 URL: https://github.com/apache/beam/pull/10945#issuecomment-591809987 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393988) Time Spent: 1h 50m (was: 1h 40m) > Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10 > --- > > Key: BEAM-9295 > URL: https://issues.apache.org/jira/browse/BEAM-9295 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Apache Flink 1.10 has completed the final release vote, see [1]. So, I would > like to add Flink 1.10 build target and make Flink Runner compatible with > Flink 1.10. > And I appreciate it if you can leave your suggestions or comments! > [1] > https://lists.apache.org/thread.html/r97672d4d1e47372cebf23e6643a6cc30a06bfbdf3f277b0be3695b15%40%3Cdev.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9295) Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10
[ https://issues.apache.org/jira/browse/BEAM-9295?focusedWorklogId=393989=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393989 ] ASF GitHub Bot logged work on BEAM-9295: Author: ASF GitHub Bot Created on: 27/Feb/20 06:44 Start Date: 27/Feb/20 06:44 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10945: [BEAM-9295] Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10 URL: https://github.com/apache/beam/pull/10945#issuecomment-591810030 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393989) Time Spent: 2h (was: 1h 50m) > Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10 > --- > > Key: BEAM-9295 > URL: https://issues.apache.org/jira/browse/BEAM-9295 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > Apache Flink 1.10 has completed the final release vote, see [1]. So, I would > like to add Flink 1.10 build target and make Flink Runner compatible with > Flink 1.10. > And I appreciate it if you can leave your suggestions or comments! > [1] > https://lists.apache.org/thread.html/r97672d4d1e47372cebf23e6643a6cc30a06bfbdf3f277b0be3695b15%40%3Cdev.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-9392) TestStream tests are all flaky
[ https://issues.apache.org/jira/browse/BEAM-9392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pablo Estrada reassigned BEAM-9392: --- Assignee: Sam Rohde > TestStream tests are all flaky > -- > > Key: BEAM-9392 > URL: https://issues.apache.org/jira/browse/BEAM-9392 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Pablo Estrada >Assignee: Sam Rohde >Priority: Major > > See: > [https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9392) TestStream tests are all flaky
[ https://issues.apache.org/jira/browse/BEAM-9392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046227#comment-17046227 ] Pablo Estrada commented on BEAM-9392: - [~rohdesam] can you take a look please? > TestStream tests are all flaky > -- > > Key: BEAM-9392 > URL: https://issues.apache.org/jira/browse/BEAM-9392 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Pablo Estrada >Assignee: Sam Rohde >Priority: Major > > See: > [https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9392) TestStream tests are all flaky
[ https://issues.apache.org/jira/browse/BEAM-9392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pablo Estrada updated BEAM-9392: Status: Open (was: Triage Needed) > TestStream tests are all flaky > -- > > Key: BEAM-9392 > URL: https://issues.apache.org/jira/browse/BEAM-9392 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Pablo Estrada >Priority: Major > > See: > [https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9392) TestStream tests are all flaky
Pablo Estrada created BEAM-9392: --- Summary: TestStream tests are all flaky Key: BEAM-9392 URL: https://issues.apache.org/jira/browse/BEAM-9392 Project: Beam Issue Type: Bug Components: test-failures Reporter: Pablo Estrada See: [https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9381) Add display data to BoundedSourceSDF
[ https://issues.apache.org/jira/browse/BEAM-9381?focusedWorklogId=393987=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393987 ] ASF GitHub Bot logged work on BEAM-9381: Author: ASF GitHub Bot Created on: 27/Feb/20 06:40 Start Date: 27/Feb/20 06:40 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10968: [BEAM-9381] Adding display data to BoundedSource SDF URL: https://github.com/apache/beam/pull/10968#issuecomment-591808874 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393987) Time Spent: 1h 40m (was: 1.5h) > Add display data to BoundedSourceSDF > - > > Key: BEAM-9381 > URL: https://issues.apache.org/jira/browse/BEAM-9381 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393941=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393941 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:19 Start Date: 27/Feb/20 03:19 Worklog Time Spent: 10m Work Description: rohdesamuel commented on pull request #10915: [BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. URL: https://github.com/apache/beam/pull/10915#discussion_r384893543 ## File path: sdks/python/apache_beam/runners/interactive/utils.py ## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities to be used in Interactive Beam. +""" + +from __future__ import absolute_import + +import pandas as pd + +from apache_beam.typehints import typehints as th +from apache_beam.utils.windowed_value import WindowedValue + +COLUMN_PREFIX = 'el' + + +def parse_row_(el, element_type, depth): + elements = [] + columns = [] + + # Recurse if there are a known length of columns to parse into. + if isinstance(element_type, (th.TupleHint.TupleConstraint)): +for index, t in enumerate(element_type._inner_types()): + underlying_columns, underlying_elements = parse_row_(el[index], t, + depth + 1) + column = '[{}]'.format(index) + if underlying_columns: +columns += [column + c for c in underlying_columns] + else: +columns += [column] + elements += underlying_elements + + # Don't make new columns for variable length types. + elif isinstance( + element_type, + (th.ListHint.ListConstraint, th.TupleHint.TupleSequenceConstraint)): +elements = [pd.array(el)] + + # For any other types, try to parse as a namedtuple, otherwise pass element + # through. + else: +fields = getattr(el, '_fields', None) +if fields: + columns = list(fields) Review comment: I still need to take a look in my code to see if anything violates this, I'll let you know. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393941) Time Spent: 77h (was: 76h 50m) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 77h > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393939=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393939 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:18 Start Date: 27/Feb/20 03:18 Worklog Time Spent: 10m Work Description: rohdesamuel commented on issue #10497: [BEAM-8335] Add the ReverseTestStream URL: https://github.com/apache/beam/pull/10497#issuecomment-591758520 R: @robertwb Hey Robert, I think this is good to review. Please review for content, I haven't had the time to add better comments yet (also still needs formatting). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393939) Time Spent: 76h 50m (was: 76h 40m) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 76h 50m > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393937=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393937 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:17 Start Date: 27/Feb/20 03:17 Worklog Time Spent: 10m Work Description: rohdesamuel commented on issue #10497: [BEAM-8335] Add the ReverseTestStream URL: https://github.com/apache/beam/pull/10497#issuecomment-591758520 Hey Robert, I think this is good to review. Please review for content, I haven't had the time to add better comments yet (also still needs formatting). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393937) Time Spent: 76h 40m (was: 76.5h) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 76h 40m > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393935=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393935 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:17 Start Date: 27/Feb/20 03:17 Worklog Time Spent: 10m Work Description: rohdesamuel commented on issue #10915: [BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. URL: https://github.com/apache/beam/pull/10915#issuecomment-591758254 Hey Robert, you'll notice that this implementation is quite different than what we talked about. When going trying to implement flattening columns from nested values, the logic just kept getting more and more complicated. If we have a rule of "use named tuples for columns names", this is a much simpler rule for them to memorize when using the interactive runner. A future fix can be to add a tabular data type (i.e. Schemas) to the type hints. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393935) Time Spent: 76.5h (was: 76h 20m) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 76.5h > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393934=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393934 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:14 Start Date: 27/Feb/20 03:14 Worklog Time Spent: 10m Work Description: rohdesamuel commented on pull request #10915: [BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. URL: https://github.com/apache/beam/pull/10915#discussion_r384892436 ## File path: sdks/python/apache_beam/runners/interactive/utils.py ## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities to be used in Interactive Beam. +""" + +from __future__ import absolute_import + +import pandas as pd + +from apache_beam.typehints import typehints as th +from apache_beam.utils.windowed_value import WindowedValue + +COLUMN_PREFIX = 'el' + + +def parse_row_(el, element_type, depth): + elements = [] + columns = [] + + # Recurse if there are a known length of columns to parse into. + if isinstance(element_type, (th.TupleHint.TupleConstraint)): +for index, t in enumerate(element_type._inner_types()): + underlying_columns, underlying_elements = parse_row_(el[index], t, + depth + 1) + column = '[{}]'.format(index) + if underlying_columns: +columns += [column + c for c in underlying_columns] + else: +columns += [column] + elements += underlying_elements + + # Don't make new columns for variable length types. + elif isinstance( + element_type, + (th.ListHint.ListConstraint, th.TupleHint.TupleSequenceConstraint)): +elements = [pd.array(el)] + + # For any other types, try to parse as a namedtuple, otherwise pass element + # through. + else: +fields = getattr(el, '_fields', None) +if fields: + columns = list(fields) + if depth > 0: +columns = ['[{}]'.format(f) for f in fields] + elements = [el._asdict()[f] for f in fields] +else: + elements = [el] + return columns, elements + + +def parse_row(el, element_type, include_window_info=True, prefix=COLUMN_PREFIX): + # Reify the WindowedValue data to the Dataframe if asked. + windowed = None + if isinstance(el, WindowedValue): +if include_window_info: + windowed = el +el = el.value + + # Parse the elements with the given type. + columns, elements = parse_row_(el, element_type, 0) + + # If there are no columns returned, there is only a single column of a + # primitive data type. + if not columns: +columns = [''] + + # Add the prefix to the columns that have an index. + for i in range(len(columns)): +if columns[i] == '' or columns[i][0] == '[': + columns[i] = prefix + columns[i] + + # Reify the windowed columns and do a best-effort casting into Pandas DTypes. + if windowed: +columns += ['event_time', 'windows', 'pane_info'] +elements += [ +windowed.timestamp.micros, windowed.windows, windowed.pane_info +] + return columns, elements + + +def pcoll_to_df( +elements, element_type, include_window_info=False, prefix=COLUMN_PREFIX): + """Parses the given elements into a Dataframe. + + Each column name will be prefixed with `prefix` concatenated with the nested + index, e.g. for a Tuple[Tuple[int, str], int], the column names will be: + [prefix[0][0], prefix[0][1], prefix[0]]. This is subject to change. + """ + rows = [] + columns = [] + + for e in elements: +columns, row = parse_row(e, element_type, include_window_info, prefix) Review comment: Changed to inferring the schema once. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393934) Time Spent: 76h 20m (was: 76h 10m) > Add streaming
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393933=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393933 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:14 Start Date: 27/Feb/20 03:14 Worklog Time Spent: 10m Work Description: rohdesamuel commented on pull request #10915: [BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. URL: https://github.com/apache/beam/pull/10915#discussion_r384892377 ## File path: sdks/python/apache_beam/runners/interactive/utils.py ## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities to be used in Interactive Beam. +""" + +from __future__ import absolute_import + +import pandas as pd + +from apache_beam.typehints import typehints as th +from apache_beam.utils.windowed_value import WindowedValue + +COLUMN_PREFIX = 'el' + + +def parse_row_(el, element_type, depth): + elements = [] + columns = [] + + # Recurse if there are a known length of columns to parse into. + if isinstance(element_type, (th.TupleHint.TupleConstraint)): +for index, t in enumerate(element_type._inner_types()): + underlying_columns, underlying_elements = parse_row_(el[index], t, + depth + 1) + column = '[{}]'.format(index) + if underlying_columns: +columns += [column + c for c in underlying_columns] + else: +columns += [column] + elements += underlying_elements + + # Don't make new columns for variable length types. + elif isinstance( + element_type, + (th.ListHint.ListConstraint, th.TupleHint.TupleSequenceConstraint)): +elements = [pd.array(el)] + + # For any other types, try to parse as a namedtuple, otherwise pass element + # through. + else: +fields = getattr(el, '_fields', None) +if fields: + columns = list(fields) + if depth > 0: +columns = ['[{}]'.format(f) for f in fields] + elements = [el._asdict()[f] for f in fields] +else: + elements = [el] + return columns, elements + + +def parse_row(el, element_type, include_window_info=True, prefix=COLUMN_PREFIX): + # Reify the WindowedValue data to the Dataframe if asked. + windowed = None + if isinstance(el, WindowedValue): +if include_window_info: + windowed = el +el = el.value + + # Parse the elements with the given type. + columns, elements = parse_row_(el, element_type, 0) + + # If there are no columns returned, there is only a single column of a + # primitive data type. + if not columns: +columns = [''] + + # Add the prefix to the columns that have an index. + for i in range(len(columns)): +if columns[i] == '' or columns[i][0] == '[': Review comment: This code was removed in a recent commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393933) Time Spent: 76h 10m (was: 76h) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 76h 10m > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393931=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393931 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:13 Start Date: 27/Feb/20 03:13 Worklog Time Spent: 10m Work Description: rohdesamuel commented on pull request #10915: [BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. URL: https://github.com/apache/beam/pull/10915#discussion_r384892289 ## File path: sdks/python/apache_beam/runners/interactive/utils.py ## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities to be used in Interactive Beam. +""" + +from __future__ import absolute_import + +import pandas as pd + +from apache_beam.typehints import typehints as th +from apache_beam.utils.windowed_value import WindowedValue + +COLUMN_PREFIX = 'el' + + +def parse_row_(el, element_type, depth): + elements = [] + columns = [] + + # Recurse if there are a known length of columns to parse into. + if isinstance(element_type, (th.TupleHint.TupleConstraint)): +for index, t in enumerate(element_type._inner_types()): + underlying_columns, underlying_elements = parse_row_(el[index], t, + depth + 1) + column = '[{}]'.format(index) + if underlying_columns: +columns += [column + c for c in underlying_columns] + else: +columns += [column] + elements += underlying_elements + + # Don't make new columns for variable length types. + elif isinstance( + element_type, + (th.ListHint.ListConstraint, th.TupleHint.TupleSequenceConstraint)): +elements = [pd.array(el)] + + # For any other types, try to parse as a namedtuple, otherwise pass element + # through. + else: +fields = getattr(el, '_fields', None) +if fields: + columns = list(fields) + if depth > 0: +columns = ['[{}]'.format(f) for f in fields] + elements = [el._asdict()[f] for f in fields] Review comment: This code was removed in a recent commit, but ack on getattr being more natural This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393931) Time Spent: 75h 50m (was: 75h 40m) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 75h 50m > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393932=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393932 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:13 Start Date: 27/Feb/20 03:13 Worklog Time Spent: 10m Work Description: rohdesamuel commented on pull request #10915: [BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. URL: https://github.com/apache/beam/pull/10915#discussion_r384892342 ## File path: sdks/python/apache_beam/runners/interactive/utils.py ## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities to be used in Interactive Beam. +""" + +from __future__ import absolute_import + +import pandas as pd + +from apache_beam.typehints import typehints as th +from apache_beam.utils.windowed_value import WindowedValue + +COLUMN_PREFIX = 'el' + + +def parse_row_(el, element_type, depth): + elements = [] + columns = [] + + # Recurse if there are a known length of columns to parse into. + if isinstance(element_type, (th.TupleHint.TupleConstraint)): +for index, t in enumerate(element_type._inner_types()): + underlying_columns, underlying_elements = parse_row_(el[index], t, + depth + 1) + column = '[{}]'.format(index) + if underlying_columns: +columns += [column + c for c in underlying_columns] + else: +columns += [column] + elements += underlying_elements + + # Don't make new columns for variable length types. + elif isinstance( + element_type, + (th.ListHint.ListConstraint, th.TupleHint.TupleSequenceConstraint)): +elements = [pd.array(el)] + + # For any other types, try to parse as a namedtuple, otherwise pass element + # through. + else: +fields = getattr(el, '_fields', None) +if fields: + columns = list(fields) + if depth > 0: +columns = ['[{}]'.format(f) for f in fields] + elements = [el._asdict()[f] for f in fields] +else: + elements = [el] + return columns, elements + + +def parse_row(el, element_type, include_window_info=True, prefix=COLUMN_PREFIX): + # Reify the WindowedValue data to the Dataframe if asked. + windowed = None + if isinstance(el, WindowedValue): +if include_window_info: + windowed = el +el = el.value + + # Parse the elements with the given type. + columns, elements = parse_row_(el, element_type, 0) + + # If there are no columns returned, there is only a single column of a + # primitive data type. + if not columns: +columns = [''] + + # Add the prefix to the columns that have an index. + for i in range(len(columns)): +if columns[i] == '' or columns[i][0] == '[': + columns[i] = prefix + columns[i] + + # Reify the windowed columns and do a best-effort casting into Pandas DTypes. + if windowed: +columns += ['event_time', 'windows', 'pane_info'] +elements += [ +windowed.timestamp.micros, windowed.windows, windowed.pane_info +] + return columns, elements + + +def pcoll_to_df( +elements, element_type, include_window_info=False, prefix=COLUMN_PREFIX): + """Parses the given elements into a Dataframe. + + Each column name will be prefixed with `prefix` concatenated with the nested + index, e.g. for a Tuple[Tuple[int, str], int], the column names will be: + [prefix[0][0], prefix[0][1], prefix[0]]. This is subject to change. Review comment: ack, changed to dots (much simpler) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393932) Time Spent: 76h (was: 75h 50m) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL:
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393930=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393930 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:13 Start Date: 27/Feb/20 03:13 Worklog Time Spent: 10m Work Description: rohdesamuel commented on pull request #10915: [BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. URL: https://github.com/apache/beam/pull/10915#discussion_r384892102 ## File path: sdks/python/apache_beam/runners/interactive/utils.py ## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities to be used in Interactive Beam. +""" + +from __future__ import absolute_import + +import pandas as pd + +from apache_beam.typehints import typehints as th +from apache_beam.utils.windowed_value import WindowedValue + +COLUMN_PREFIX = 'el' + + +def parse_row_(el, element_type, depth): + elements = [] + columns = [] + + # Recurse if there are a known length of columns to parse into. + if isinstance(element_type, (th.TupleHint.TupleConstraint)): +for index, t in enumerate(element_type._inner_types()): + underlying_columns, underlying_elements = parse_row_(el[index], t, + depth + 1) + column = '[{}]'.format(index) + if underlying_columns: +columns += [column + c for c in underlying_columns] + else: +columns += [column] + elements += underlying_elements + + # Don't make new columns for variable length types. + elif isinstance( + element_type, + (th.ListHint.ListConstraint, th.TupleHint.TupleSequenceConstraint)): +elements = [pd.array(el)] + + # For any other types, try to parse as a namedtuple, otherwise pass element + # through. + else: +fields = getattr(el, '_fields', None) Review comment: Unfortunately this is the way to check if it is a named tuple. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393930) Time Spent: 75h 40m (was: 75.5h) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 75h 40m > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393927=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393927 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:12 Start Date: 27/Feb/20 03:12 Worklog Time Spent: 10m Work Description: rohdesamuel commented on pull request #10915: [BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. URL: https://github.com/apache/beam/pull/10915#discussion_r384891918 ## File path: sdks/python/apache_beam/runners/interactive/utils.py ## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities to be used in Interactive Beam. +""" + +from __future__ import absolute_import + +import pandas as pd + +from apache_beam.typehints import typehints as th +from apache_beam.utils.windowed_value import WindowedValue + +COLUMN_PREFIX = 'el' + + +def parse_row_(el, element_type, depth): Review comment: Ack, I changed the name to just parse_row (because I use it in tests). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393927) Time Spent: 75h 20m (was: 75h 10m) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 75h 20m > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393928=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393928 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:12 Start Date: 27/Feb/20 03:12 Worklog Time Spent: 10m Work Description: rohdesamuel commented on pull request #10915: [BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. URL: https://github.com/apache/beam/pull/10915#discussion_r384891956 ## File path: sdks/python/apache_beam/runners/interactive/utils.py ## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities to be used in Interactive Beam. +""" + +from __future__ import absolute_import + +import pandas as pd + +from apache_beam.typehints import typehints as th +from apache_beam.utils.windowed_value import WindowedValue + +COLUMN_PREFIX = 'el' Review comment: Ack, changed to elt This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393928) Time Spent: 75.5h (was: 75h 20m) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 75.5h > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9322) Python SDK ignores manually set PCollection tags
[ https://issues.apache.org/jira/browse/BEAM-9322?focusedWorklogId=393919=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393919 ] ASF GitHub Bot logged work on BEAM-9322: Author: ASF GitHub Bot Created on: 27/Feb/20 02:30 Start Date: 27/Feb/20 02:30 Worklog Time Spent: 10m Work Description: rohdesamuel commented on issue #10934: [BEAM-9322] [BEAM-1833] Broke some people, setting the default to have the experiment be disabled URL: https://github.com/apache/beam/pull/10934#issuecomment-591747015 > I see this error in the logs: > > 17:01:54 > assert event_tags.issubset(self.output_tags) > 17:01:54 E AssertionError: assert False > 17:01:54 E + where False = (set([None, '1'])) > 17:01:54 E + where = set(['a', 'b']).issubset > 17:01:54 E + and set([None, '1']) = .output_tags > > @rohdesamuel could you take a look? Yep, taking a look This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393919) Time Spent: 2.5h (was: 2h 20m) > Python SDK ignores manually set PCollection tags > > > Key: BEAM-9322 > URL: https://issues.apache.org/jira/browse/BEAM-9322 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2.5h > Remaining Estimate: 0h > > The Python SDK currently ignores any tags set on PCollections manually when > applying PTransforms when adding the PCollection to the PTransform > [outputs|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L595]]. > In the > [add_output|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L872]] > method, the tag is set to None for all PValues, meaning the output tags are > set to an enumeration index over the PCollection outputs. The tags are not > propagated to correctly which can be a problem on relying on the output > PCollection tags to match the user set values. > The fix is to correct BEAM-1833, and always pass in the tags. However, that > doesn't fix the problem for nested PCollections. If you have a dict of lists > of PCollections, what should their tags be correctly set to? In order to fix > this, first propagate the correct tag then talk with the community about the > best auto-generated tags. > Some users may rely on the old implementation, so a flag will be created: > "force_generated_pcollection_output_ids" and be default set to False. If > True, this will go to the old implementation and generate tags for > PCollections. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9322) Python SDK ignores manually set PCollection tags
[ https://issues.apache.org/jira/browse/BEAM-9322?focusedWorklogId=393918=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393918 ] ASF GitHub Bot logged work on BEAM-9322: Author: ASF GitHub Bot Created on: 27/Feb/20 02:30 Start Date: 27/Feb/20 02:30 Worklog Time Spent: 10m Work Description: aaltay commented on issue #10934: [BEAM-9322] [BEAM-1833] Broke some people, setting the default to have the experiment be disabled URL: https://github.com/apache/beam/pull/10934#issuecomment-591746886 I see this error in the logs: 17:01:54 > assert event_tags.issubset(self.output_tags) 17:01:54 E AssertionError: assert False 17:01:54 E + where False = (set([None, '1'])) 17:01:54 E +where = set(['a', 'b']).issubset 17:01:54 E +and set([None, '1']) = .output_tags @rohdesamuel could you take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393918) Time Spent: 2h 20m (was: 2h 10m) > Python SDK ignores manually set PCollection tags > > > Key: BEAM-9322 > URL: https://issues.apache.org/jira/browse/BEAM-9322 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2h 20m > Remaining Estimate: 0h > > The Python SDK currently ignores any tags set on PCollections manually when > applying PTransforms when adding the PCollection to the PTransform > [outputs|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L595]]. > In the > [add_output|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L872]] > method, the tag is set to None for all PValues, meaning the output tags are > set to an enumeration index over the PCollection outputs. The tags are not > propagated to correctly which can be a problem on relying on the output > PCollection tags to match the user set values. > The fix is to correct BEAM-1833, and always pass in the tags. However, that > doesn't fix the problem for nested PCollections. If you have a dict of lists > of PCollections, what should their tags be correctly set to? In order to fix > this, first propagate the correct tag then talk with the community about the > best auto-generated tags. > Some users may rely on the old implementation, so a flag will be created: > "force_generated_pcollection_output_ids" and be default set to False. If > True, this will go to the old implementation and generate tags for > PCollections. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7746) Add type hints to python code
[ https://issues.apache.org/jira/browse/BEAM-7746?focusedWorklogId=393916=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393916 ] ASF GitHub Bot logged work on BEAM-7746: Author: ASF GitHub Bot Created on: 27/Feb/20 02:25 Start Date: 27/Feb/20 02:25 Worklog Time Spent: 10m Work Description: udim commented on issue #10822: [BEAM-7746] Minor typing updates / fixes URL: https://github.com/apache/beam/pull/10822#issuecomment-591745730 LGTM, tests already failing here: https://builds.apache.org/job/beam_PreCommit_Python_Cron/2443/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393916) Time Spent: 67h 50m (was: 67h 40m) > Add type hints to python code > - > > Key: BEAM-7746 > URL: https://issues.apache.org/jira/browse/BEAM-7746 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chad Dombrova >Assignee: Chad Dombrova >Priority: Major > Time Spent: 67h 50m > Remaining Estimate: 0h > > As a developer of the beam source code, I would like the code to use pep484 > type hints so that I can clearly see what types are required, get completion > in my IDE, and enforce code correctness via a static analyzer like mypy. > This may be considered a precursor to BEAM-7060 > Work has been started here: [https://github.com/apache/beam/pull/9056] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9322) Python SDK ignores manually set PCollection tags
[ https://issues.apache.org/jira/browse/BEAM-9322?focusedWorklogId=393914=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393914 ] ASF GitHub Bot logged work on BEAM-9322: Author: ASF GitHub Bot Created on: 27/Feb/20 02:21 Start Date: 27/Feb/20 02:21 Worklog Time Spent: 10m Work Description: udim commented on issue #10934: [BEAM-9322] [BEAM-1833] Broke some people, setting the default to have the experiment be disabled URL: https://github.com/apache/beam/pull/10934#issuecomment-591741341 This change may have broken precommits: https://builds.apache.org/job/beam_PreCommit_Python_Cron/2443/ edit: actually, not quite sure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393914) Time Spent: 2h 10m (was: 2h) > Python SDK ignores manually set PCollection tags > > > Key: BEAM-9322 > URL: https://issues.apache.org/jira/browse/BEAM-9322 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2h 10m > Remaining Estimate: 0h > > The Python SDK currently ignores any tags set on PCollections manually when > applying PTransforms when adding the PCollection to the PTransform > [outputs|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L595]]. > In the > [add_output|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L872]] > method, the tag is set to None for all PValues, meaning the output tags are > set to an enumeration index over the PCollection outputs. The tags are not > propagated to correctly which can be a problem on relying on the output > PCollection tags to match the user set values. > The fix is to correct BEAM-1833, and always pass in the tags. However, that > doesn't fix the problem for nested PCollections. If you have a dict of lists > of PCollections, what should their tags be correctly set to? In order to fix > this, first propagate the correct tag then talk with the community about the > best auto-generated tags. > Some users may rely on the old implementation, so a flag will be created: > "force_generated_pcollection_output_ids" and be default set to False. If > True, this will go to the old implementation and generate tags for > PCollections. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9322) Python SDK ignores manually set PCollection tags
[ https://issues.apache.org/jira/browse/BEAM-9322?focusedWorklogId=393909=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393909 ] ASF GitHub Bot logged work on BEAM-9322: Author: ASF GitHub Bot Created on: 27/Feb/20 02:10 Start Date: 27/Feb/20 02:10 Worklog Time Spent: 10m Work Description: udim commented on issue #10934: [BEAM-9322] [BEAM-1833] Broke some people, setting the default to have the experiment be disabled URL: https://github.com/apache/beam/pull/10934#issuecomment-591741341 I believe this change may have broken precommits: https://builds.apache.org/job/beam_PreCommit_Python_Cron/2443/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393909) Time Spent: 1h 50m (was: 1h 40m) > Python SDK ignores manually set PCollection tags > > > Key: BEAM-9322 > URL: https://issues.apache.org/jira/browse/BEAM-9322 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.20.0 > > Time Spent: 1h 50m > Remaining Estimate: 0h > > The Python SDK currently ignores any tags set on PCollections manually when > applying PTransforms when adding the PCollection to the PTransform > [outputs|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L595]]. > In the > [add_output|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L872]] > method, the tag is set to None for all PValues, meaning the output tags are > set to an enumeration index over the PCollection outputs. The tags are not > propagated to correctly which can be a problem on relying on the output > PCollection tags to match the user set values. > The fix is to correct BEAM-1833, and always pass in the tags. However, that > doesn't fix the problem for nested PCollections. If you have a dict of lists > of PCollections, what should their tags be correctly set to? In order to fix > this, first propagate the correct tag then talk with the community about the > best auto-generated tags. > Some users may rely on the old implementation, so a flag will be created: > "force_generated_pcollection_output_ids" and be default set to False. If > True, this will go to the old implementation and generate tags for > PCollections. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9322) Python SDK ignores manually set PCollection tags
[ https://issues.apache.org/jira/browse/BEAM-9322?focusedWorklogId=393910=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393910 ] ASF GitHub Bot logged work on BEAM-9322: Author: ASF GitHub Bot Created on: 27/Feb/20 02:10 Start Date: 27/Feb/20 02:10 Worklog Time Spent: 10m Work Description: udim commented on issue #10934: [BEAM-9322] [BEAM-1833] Broke some people, setting the default to have the experiment be disabled URL: https://github.com/apache/beam/pull/10934#issuecomment-591741341 This change may have broken precommits: https://builds.apache.org/job/beam_PreCommit_Python_Cron/2443/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393910) Time Spent: 2h (was: 1h 50m) > Python SDK ignores manually set PCollection tags > > > Key: BEAM-9322 > URL: https://issues.apache.org/jira/browse/BEAM-9322 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2h > Remaining Estimate: 0h > > The Python SDK currently ignores any tags set on PCollections manually when > applying PTransforms when adding the PCollection to the PTransform > [outputs|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L595]]. > In the > [add_output|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L872]] > method, the tag is set to None for all PValues, meaning the output tags are > set to an enumeration index over the PCollection outputs. The tags are not > propagated to correctly which can be a problem on relying on the output > PCollection tags to match the user set values. > The fix is to correct BEAM-1833, and always pass in the tags. However, that > doesn't fix the problem for nested PCollections. If you have a dict of lists > of PCollections, what should their tags be correctly set to? In order to fix > this, first propagate the correct tag then talk with the community about the > best auto-generated tags. > Some users may rely on the old implementation, so a flag will be created: > "force_generated_pcollection_output_ids" and be default set to False. If > True, this will go to the old implementation and generate tags for > PCollections. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8487) Python typehints: support forward references
[ https://issues.apache.org/jira/browse/BEAM-8487?focusedWorklogId=393907=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393907 ] ASF GitHub Bot logged work on BEAM-8487: Author: ASF GitHub Bot Created on: 27/Feb/20 02:04 Start Date: 27/Feb/20 02:04 Worklog Time Spent: 10m Work Description: tvalentyn commented on issue #10932: [BEAM-8487] Handle nested forward references URL: https://github.com/apache/beam/pull/10932#issuecomment-591739717 LGTM, thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393907) Time Spent: 1h 50m (was: 1h 40m) > Python typehints: support forward references > > > Key: BEAM-8487 > URL: https://issues.apache.org/jira/browse/BEAM-8487 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Udi Meiri >Assignee: Udi Meiri >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Typehints may be given as string literals: > https://www.python.org/dev/peps/pep-0484/#forward-references > These are currently not evaluated and result in errors. > Example 1: > {code} > def test_typed_callable_string_hints(self): > def do_fn(element: 'int') -> 'typehints.List[str]': > return [[str(element)] * 2] > result = [1, 2] | beam.ParDo(do_fn) > self.assertEqual([['1', '1'], ['2', '2']], sorted(result)) > {code} > This results in: > {code} > > return issubclass(sub, base) > E TypeError: issubclass() arg 2 must be a class or tuple of classes > typehints.py:1168: TypeError > {code} > Example 2: > {code} > def test_typed_dofn_string_hints(self): > class MyDoFn(beam.DoFn): > def process(self, element: 'int') -> 'typehints.List[str]': > return [[str(element)] * 2] > result = [1, 2] | beam.ParDo(MyDoFn()) > self.assertEqual([['1', '1'], ['2', '2']], sorted(result)) > {code} > This results in: > {code} > > raise ValueError('%s is not iterable' % type_hint) > E ValueError: typehints.List[str] is not iterable > typehints.py:1194: ValueError > {code} > where the non-iterable entity the error refers to is a string literal > ("typehints.List[str]"). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8599) Establish consensus around how many concurrent minor versions of Python Beam should support, and deprecation policy for older versions.
[ https://issues.apache.org/jira/browse/BEAM-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046055#comment-17046055 ] Valentyn Tymofieiev commented on BEAM-8599: --- This is being discussed at: [1] https://lists.apache.org/thread.html/rd070afcebff5c967ec3b25d1f7a77db5278992c1508082bf5f636acd%40%3Cdev.beam.apache.org%3E > Establish consensus around how many concurrent minor versions of Python Beam > should support, and deprecation policy for older versions. > > > Key: BEAM-8599 > URL: https://issues.apache.org/jira/browse/BEAM-8599 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Valentyn Tymofieiev >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8487) Python typehints: support forward references
[ https://issues.apache.org/jira/browse/BEAM-8487?focusedWorklogId=393905=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393905 ] ASF GitHub Bot logged work on BEAM-8487: Author: ASF GitHub Bot Created on: 27/Feb/20 01:59 Start Date: 27/Feb/20 01:59 Worklog Time Spent: 10m Work Description: udim commented on pull request #10932: [BEAM-8487] Handle nested forward references URL: https://github.com/apache/beam/pull/10932#discussion_r384873795 ## File path: sdks/python/apache_beam/typehints/native_type_compatibility.py ## @@ -163,8 +163,14 @@ def is_any(typ): return typ is typing.Any +try: + _ForwardRef = typing.ForwardRef +except AttributeError: Review comment: done, also rebased This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393905) Time Spent: 1h 40m (was: 1.5h) > Python typehints: support forward references > > > Key: BEAM-8487 > URL: https://issues.apache.org/jira/browse/BEAM-8487 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Udi Meiri >Assignee: Udi Meiri >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > Typehints may be given as string literals: > https://www.python.org/dev/peps/pep-0484/#forward-references > These are currently not evaluated and result in errors. > Example 1: > {code} > def test_typed_callable_string_hints(self): > def do_fn(element: 'int') -> 'typehints.List[str]': > return [[str(element)] * 2] > result = [1, 2] | beam.ParDo(do_fn) > self.assertEqual([['1', '1'], ['2', '2']], sorted(result)) > {code} > This results in: > {code} > > return issubclass(sub, base) > E TypeError: issubclass() arg 2 must be a class or tuple of classes > typehints.py:1168: TypeError > {code} > Example 2: > {code} > def test_typed_dofn_string_hints(self): > class MyDoFn(beam.DoFn): > def process(self, element: 'int') -> 'typehints.List[str]': > return [[str(element)] * 2] > result = [1, 2] | beam.ParDo(MyDoFn()) > self.assertEqual([['1', '1'], ['2', '2']], sorted(result)) > {code} > This results in: > {code} > > raise ValueError('%s is not iterable' % type_hint) > E ValueError: typehints.List[str] is not iterable > typehints.py:1194: ValueError > {code} > where the non-iterable entity the error refers to is a string literal > ("typehints.List[str]"). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8494) Python 3.8 Support
[ https://issues.apache.org/jira/browse/BEAM-8494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046048#comment-17046048 ] Valentyn Tymofieiev commented on BEAM-8494: --- There is a discussion relevant to this issue[1]. [1] https://lists.apache.org/thread.html/rd070afcebff5c967ec3b25d1f7a77db5278992c1508082bf5f636acd%40%3Cdev.beam.apache.org%3E > Python 3.8 Support > -- > > Key: BEAM-8494 > URL: https://issues.apache.org/jira/browse/BEAM-8494 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Valentyn Tymofieiev >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8841) Add ability to perform BigQuery file loads using avro
[ https://issues.apache.org/jira/browse/BEAM-8841?focusedWorklogId=393904=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393904 ] ASF GitHub Bot logged work on BEAM-8841: Author: ASF GitHub Bot Created on: 27/Feb/20 01:48 Start Date: 27/Feb/20 01:48 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10979: [BEAM-8841] Support writing data to BigQuery via Avro in Python SDK URL: https://github.com/apache/beam/pull/10979#issuecomment-591735822 Run Python 3.5 PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393904) Time Spent: 2h 10m (was: 2h) > Add ability to perform BigQuery file loads using avro > - > > Key: BEAM-8841 > URL: https://issues.apache.org/jira/browse/BEAM-8841 > Project: Beam > Issue Type: Improvement > Components: io-py-gcp >Reporter: Chun Yang >Assignee: Chun Yang >Priority: Minor > Time Spent: 2h 10m > Remaining Estimate: 0h > > Currently, JSON format is used for file loads into BigQuery in the Python > SDK. JSON has some disadvantages including size of serialized data and > inability to represent NaN and infinity float values. > BigQuery supports loading files in avro format, which can overcome these > disadvantages. The Java SDK already supports loading files using avro format > (BEAM-2879) so it makes sense to support it in the Python SDK as well. > The change will be somewhere around > [{{BigQueryBatchFileLoads}}|https://github.com/apache/beam/blob/3e7865ee6c6a56e51199515ec5b4b16de1ddd166/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L554]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7746) Add type hints to python code
[ https://issues.apache.org/jira/browse/BEAM-7746?focusedWorklogId=393903=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393903 ] ASF GitHub Bot logged work on BEAM-7746: Author: ASF GitHub Bot Created on: 27/Feb/20 01:48 Start Date: 27/Feb/20 01:48 Worklog Time Spent: 10m Work Description: chadrik commented on issue #10822: [BEAM-7746] Minor typing updates / fixes URL: https://github.com/apache/beam/pull/10822#issuecomment-591735657 @robertwb I don't think the test failures are my fault because they were passing before I rebased onto master... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393903) Time Spent: 67h 40m (was: 67.5h) > Add type hints to python code > - > > Key: BEAM-7746 > URL: https://issues.apache.org/jira/browse/BEAM-7746 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chad Dombrova >Assignee: Chad Dombrova >Priority: Major > Time Spent: 67h 40m > Remaining Estimate: 0h > > As a developer of the beam source code, I would like the code to use pep484 > type hints so that I can clearly see what types are required, get completion > in my IDE, and enforce code correctness via a static analyzer like mypy. > This may be considered a precursor to BEAM-7060 > Work has been started here: [https://github.com/apache/beam/pull/9056] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9391) Cleanup hardcoded unified worker images
Ankur Goenka created BEAM-9391: -- Summary: Cleanup hardcoded unified worker images Key: BEAM-9391 URL: https://issues.apache.org/jira/browse/BEAM-9391 Project: Beam Issue Type: Bug Components: sdk-py-harness, testing Reporter: Ankur Goenka Assignee: Ankur Goenka -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393899=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393899 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 27/Feb/20 01:41 Start Date: 27/Feb/20 01:41 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#issuecomment-591733789 > Looks good. I'd like to see at least one smoke test. There are validates runner unbounded source tests which will all be converted over. Some of them are failing right now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393899) Time Spent: 21h 40m (was: 21.5h) > Fn API SDF support > -- > > Key: BEAM-2939 > URL: https://issues.apache.org/jira/browse/BEAM-2939 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Henning Rohde >Assignee: Luke Cwik >Priority: Major > Labels: portability > Time Spent: 21h 40m > Remaining Estimate: 0h > > The Fn API should support streaming SDF. Detailed design TBD. > Once design is ready, expand subtasks similarly to BEAM-2822. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393897=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393897 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 27/Feb/20 01:40 Start Date: 27/Feb/20 01:40 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#discussion_r384868736 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ## @@ -375,4 +422,361 @@ public void checkDone() throws IllegalStateException { } } } + + /** + * A splittable {@link DoFn} which executes an {@link UnboundedSource}. + * + * We model the element as the original source and the restriction as a pair of the sub-source + * and its {@link CheckpointMark}. This allows us to split the sub-source over and over as long as + * the checkpoint mark is {@code null} or the {@link NoopCheckpointMark} since it does not + * maintain any state. + */ + // TODO: Support reporting the watermark, currently the watermark never advances. + @UnboundedPerElement + static class UnboundedSourceAsSDFWrapperFn + extends DoFn, ValueWithRecordId> { + +private static final int DEFAULT_DESIRED_NUM_SPLITS = 20; +private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10; +private final Coder restrictionCoder; + +private UnboundedSourceAsSDFWrapperFn(Coder restrictionCoder) { + this.restrictionCoder = restrictionCoder; +} + +@GetInitialRestriction +public KV, CheckpointT> initialRestriction( +@Element UnboundedSource element) { + return KV.of(element, null); +} + +@GetSize +public double getSize( +@Restriction KV, CheckpointT> restriction, +PipelineOptions pipelineOptions) +throws Exception { + if (restriction.getKey() instanceof EmptyUnboundedSource) { +return 1; + } + + UnboundedReader reader = + restriction.getKey().createReader(pipelineOptions, restriction.getValue()); + long size = reader.getSplitBacklogBytes(); + if (size != UnboundedReader.BACKLOG_UNKNOWN) { +return size; + } + // TODO: Support "global" backlog reporting + // size = reader.getTotalBacklogBytes(); + // if (size != UnboundedReader.BACKLOG_UNKNOWN) { + // return size; + // } + return 1; +} + +@SplitRestriction +public void splitRestriction( +@Restriction KV, CheckpointT> restriction, +OutputReceiver, CheckpointT>> receiver, +PipelineOptions pipelineOptions) +throws Exception { + // The empty unbounded source is trivially done and hence we don't need to output any splits + // for it. + if (restriction.getKey() instanceof EmptyUnboundedSource) { +return; + } + + // The UnboundedSource API does not support splitting after a meaningful checkpoint mark has + // been created. + if (restriction.getValue() != null + && !(restriction.getValue() + instanceof UnboundedSource.CheckpointMark.NoopCheckpointMark)) { +receiver.output(restriction); + } + + try { +for (UnboundedSource split : +restriction.getKey().split(DEFAULT_DESIRED_NUM_SPLITS, pipelineOptions)) { + receiver.output(KV.of(split, null)); +} + } catch (Exception e) { +receiver.output(restriction); + } +} + +@NewTracker +public RestrictionTracker< +KV, CheckpointT>, UnboundedSourceValue[]> +restrictionTracker( +@Restriction KV, CheckpointT> restriction, +PipelineOptions pipelineOptions) { + return new UnboundedSourceAsSDFRestrictionTracker(restriction, pipelineOptions); +} + +@ProcessElement +public ProcessContinuation processElement( +RestrictionTracker< +KV, CheckpointT>, UnboundedSourceValue[]> +tracker, +OutputReceiver> receiver, +BundleFinalizer bundleFinalizer) +throws IOException { + UnboundedSourceValue[] out = new UnboundedSourceValue[1]; + while (tracker.tryClaim(out)) { +receiver.outputWithTimestamp( +new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), out[0].getTimestamp()); + } + + // Add the checkpoint mark to be finalized if the checkpoint mark isn't trivial. + KV, CheckpointT> currentRestriction = + tracker.currentRestriction(); + if (currentRestriction.getValue() != null + && !(tracker.currentRestriction().getValue() instanceof NoopCheckpointMark)) { +bundleFinalizer.afterBundleCommit( +
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393896=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393896 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 27/Feb/20 01:40 Start Date: 27/Feb/20 01:40 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#discussion_r384867760 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ## @@ -375,4 +422,361 @@ public void checkDone() throws IllegalStateException { } } } + + /** + * A splittable {@link DoFn} which executes an {@link UnboundedSource}. + * + * We model the element as the original source and the restriction as a pair of the sub-source + * and its {@link CheckpointMark}. This allows us to split the sub-source over and over as long as + * the checkpoint mark is {@code null} or the {@link NoopCheckpointMark} since it does not + * maintain any state. + */ + // TODO: Support reporting the watermark, currently the watermark never advances. Review comment: Will file JIRA if I can't do all the watermark reporting passing and implementation in the SDK harness. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393896) Time Spent: 21.5h (was: 21h 20m) > Fn API SDF support > -- > > Key: BEAM-2939 > URL: https://issues.apache.org/jira/browse/BEAM-2939 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Henning Rohde >Assignee: Luke Cwik >Priority: Major > Labels: portability > Time Spent: 21.5h > Remaining Estimate: 0h > > The Fn API should support streaming SDF. Detailed design TBD. > Once design is ready, expand subtasks similarly to BEAM-2822. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393898=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393898 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 27/Feb/20 01:40 Start Date: 27/Feb/20 01:40 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#discussion_r384868638 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ## @@ -375,4 +422,361 @@ public void checkDone() throws IllegalStateException { } } } + + /** + * A splittable {@link DoFn} which executes an {@link UnboundedSource}. + * + * We model the element as the original source and the restriction as a pair of the sub-source + * and its {@link CheckpointMark}. This allows us to split the sub-source over and over as long as + * the checkpoint mark is {@code null} or the {@link NoopCheckpointMark} since it does not + * maintain any state. + */ + // TODO: Support reporting the watermark, currently the watermark never advances. + @UnboundedPerElement + static class UnboundedSourceAsSDFWrapperFn + extends DoFn, ValueWithRecordId> { + +private static final int DEFAULT_DESIRED_NUM_SPLITS = 20; Review comment: The default in Dataflow is `4 * first non-null of (maxNumWorkers, numWorkers, 5)`: https://github.com/apache/beam/blob/860131b5d47c830b772d3f1665a26a45ec85ab36/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java#L51 (I like the TODO in the method) Which is why I went with 20 as the default. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393898) Time Spent: 21.5h (was: 21h 20m) > Fn API SDF support > -- > > Key: BEAM-2939 > URL: https://issues.apache.org/jira/browse/BEAM-2939 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Henning Rohde >Assignee: Luke Cwik >Priority: Major > Labels: portability > Time Spent: 21.5h > Remaining Estimate: 0h > > The Fn API should support streaming SDF. Detailed design TBD. > Once design is ready, expand subtasks similarly to BEAM-2822. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393895=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393895 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 27/Feb/20 01:40 Start Date: 27/Feb/20 01:40 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#discussion_r384867042 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ## @@ -185,6 +200,37 @@ private Unbounded(@Nullable String name, UnboundedSource source) { @Override public final PCollection expand(PBegin input) { source.validate(); + + if (ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api") + && !ExperimentalOptions.hasExperiment( + input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")) { +// We don't use Create here since Create is defined as a BoundedSource and using it would +// cause an infinite expansion loop. We can reconsider this if Create is implemented +// directly as a SplittableDoFn. +PCollection> outputWithIds = +input +.getPipeline() +.apply(Impulse.create()) +.apply( +MapElements.into(new TypeDescriptor>() {}) +.via(element -> (UnboundedSource) source)) +.setCoder( +SerializableCoder.of( +new TypeDescriptor>() {})) +.apply( +ParDo.of( +new UnboundedSourceAsSDFWrapperFn<>( +(Coder) source.getCheckpointMarkCoder( +.setCoder(ValueWithRecordIdCoder.of(source.getOutputCoder())); +if (source.requiresDeduping()) { + outputWithIds.apply( + Distinct., byte[]>withRepresentativeValueFn( Review comment: No it won't, this was for some testing of mine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393895) Time Spent: 21h 20m (was: 21h 10m) > Fn API SDF support > -- > > Key: BEAM-2939 > URL: https://issues.apache.org/jira/browse/BEAM-2939 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Henning Rohde >Assignee: Luke Cwik >Priority: Major > Labels: portability > Time Spent: 21h 20m > Remaining Estimate: 0h > > The Fn API should support streaming SDF. Detailed design TBD. > Once design is ready, expand subtasks similarly to BEAM-2822. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9381) Add display data to BoundedSourceSDF
[ https://issues.apache.org/jira/browse/BEAM-9381?focusedWorklogId=393892=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393892 ] ASF GitHub Bot logged work on BEAM-9381: Author: ASF GitHub Bot Created on: 27/Feb/20 01:34 Start Date: 27/Feb/20 01:34 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10968: [BEAM-9381] Adding display data to BoundedSource SDF URL: https://github.com/apache/beam/pull/10968#issuecomment-591731834 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393892) Time Spent: 1.5h (was: 1h 20m) > Add display data to BoundedSourceSDF > - > > Key: BEAM-9381 > URL: https://issues.apache.org/jira/browse/BEAM-9381 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8841) Add ability to perform BigQuery file loads using avro
[ https://issues.apache.org/jira/browse/BEAM-8841?focusedWorklogId=393893=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393893 ] ASF GitHub Bot logged work on BEAM-8841: Author: ASF GitHub Bot Created on: 27/Feb/20 01:34 Start Date: 27/Feb/20 01:34 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10979: [BEAM-8841] Support writing data to BigQuery via Avro in Python SDK URL: https://github.com/apache/beam/pull/10979#issuecomment-591731930 Run Python 3.7 PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393893) Time Spent: 2h (was: 1h 50m) > Add ability to perform BigQuery file loads using avro > - > > Key: BEAM-8841 > URL: https://issues.apache.org/jira/browse/BEAM-8841 > Project: Beam > Issue Type: Improvement > Components: io-py-gcp >Reporter: Chun Yang >Assignee: Chun Yang >Priority: Minor > Time Spent: 2h > Remaining Estimate: 0h > > Currently, JSON format is used for file loads into BigQuery in the Python > SDK. JSON has some disadvantages including size of serialized data and > inability to represent NaN and infinity float values. > BigQuery supports loading files in avro format, which can overcome these > disadvantages. The Java SDK already supports loading files using avro format > (BEAM-2879) so it makes sense to support it in the Python SDK as well. > The change will be somewhere around > [{{BigQueryBatchFileLoads}}|https://github.com/apache/beam/blob/3e7865ee6c6a56e51199515ec5b4b16de1ddd166/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L554]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8841) Add ability to perform BigQuery file loads using avro
[ https://issues.apache.org/jira/browse/BEAM-8841?focusedWorklogId=393890=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393890 ] ASF GitHub Bot logged work on BEAM-8841: Author: ASF GitHub Bot Created on: 27/Feb/20 01:32 Start Date: 27/Feb/20 01:32 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10979: [BEAM-8841] Support writing data to BigQuery via Avro in Python SDK URL: https://github.com/apache/beam/pull/10979#issuecomment-591731506 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393890) Time Spent: 1h 50m (was: 1h 40m) > Add ability to perform BigQuery file loads using avro > - > > Key: BEAM-8841 > URL: https://issues.apache.org/jira/browse/BEAM-8841 > Project: Beam > Issue Type: Improvement > Components: io-py-gcp >Reporter: Chun Yang >Assignee: Chun Yang >Priority: Minor > Time Spent: 1h 50m > Remaining Estimate: 0h > > Currently, JSON format is used for file loads into BigQuery in the Python > SDK. JSON has some disadvantages including size of serialized data and > inability to represent NaN and infinity float values. > BigQuery supports loading files in avro format, which can overcome these > disadvantages. The Java SDK already supports loading files using avro format > (BEAM-2879) so it makes sense to support it in the Python SDK as well. > The change will be somewhere around > [{{BigQueryBatchFileLoads}}|https://github.com/apache/beam/blob/3e7865ee6c6a56e51199515ec5b4b16de1ddd166/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L554]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393889=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393889 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 27/Feb/20 01:30 Start Date: 27/Feb/20 01:30 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#discussion_r384866125 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ## @@ -375,4 +422,361 @@ public void checkDone() throws IllegalStateException { } } } + + /** + * A splittable {@link DoFn} which executes an {@link UnboundedSource}. + * + * We model the element as the original source and the restriction as a pair of the sub-source + * and its {@link CheckpointMark}. This allows us to split the sub-source over and over as long as + * the checkpoint mark is {@code null} or the {@link NoopCheckpointMark} since it does not + * maintain any state. + */ + // TODO: Support reporting the watermark, currently the watermark never advances. + @UnboundedPerElement + static class UnboundedSourceAsSDFWrapperFn + extends DoFn, ValueWithRecordId> { + +private static final int DEFAULT_DESIRED_NUM_SPLITS = 20; +private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10; +private final Coder restrictionCoder; + +private UnboundedSourceAsSDFWrapperFn(Coder restrictionCoder) { + this.restrictionCoder = restrictionCoder; +} + +@GetInitialRestriction +public KV, CheckpointT> initialRestriction( +@Element UnboundedSource element) { + return KV.of(element, null); +} + +@GetSize +public double getSize( +@Restriction KV, CheckpointT> restriction, +PipelineOptions pipelineOptions) +throws Exception { + if (restriction.getKey() instanceof EmptyUnboundedSource) { +return 1; + } + + UnboundedReader reader = + restriction.getKey().createReader(pipelineOptions, restriction.getValue()); + long size = reader.getSplitBacklogBytes(); + if (size != UnboundedReader.BACKLOG_UNKNOWN) { +return size; + } + // TODO: Support "global" backlog reporting + // size = reader.getTotalBacklogBytes(); + // if (size != UnboundedReader.BACKLOG_UNKNOWN) { + // return size; + // } + return 1; +} + +@SplitRestriction +public void splitRestriction( +@Restriction KV, CheckpointT> restriction, +OutputReceiver, CheckpointT>> receiver, +PipelineOptions pipelineOptions) +throws Exception { + // The empty unbounded source is trivially done and hence we don't need to output any splits + // for it. + if (restriction.getKey() instanceof EmptyUnboundedSource) { +return; + } + + // The UnboundedSource API does not support splitting after a meaningful checkpoint mark has + // been created. + if (restriction.getValue() != null + && !(restriction.getValue() + instanceof UnboundedSource.CheckpointMark.NoopCheckpointMark)) { +receiver.output(restriction); + } + + try { +for (UnboundedSource split : +restriction.getKey().split(DEFAULT_DESIRED_NUM_SPLITS, pipelineOptions)) { + receiver.output(KV.of(split, null)); +} + } catch (Exception e) { +receiver.output(restriction); + } +} + +@NewTracker +public RestrictionTracker< +KV, CheckpointT>, UnboundedSourceValue[]> +restrictionTracker( +@Restriction KV, CheckpointT> restriction, +PipelineOptions pipelineOptions) { + return new UnboundedSourceAsSDFRestrictionTracker(restriction, pipelineOptions); +} + +@ProcessElement +public ProcessContinuation processElement( +RestrictionTracker< +KV, CheckpointT>, UnboundedSourceValue[]> +tracker, +OutputReceiver> receiver, +BundleFinalizer bundleFinalizer) +throws IOException { + UnboundedSourceValue[] out = new UnboundedSourceValue[1]; + while (tracker.tryClaim(out)) { +receiver.outputWithTimestamp( +new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), out[0].getTimestamp()); + } + + // Add the checkpoint mark to be finalized if the checkpoint mark isn't trivial. + KV, CheckpointT> currentRestriction = + tracker.currentRestriction(); + if (currentRestriction.getValue() != null + && !(tracker.currentRestriction().getValue() instanceof NoopCheckpointMark)) { +bundleFinalizer.afterBundleCommit( +
[jira] [Closed] (BEAM-8975) Add Thrift Parser for ThriftIO
[ https://issues.apache.org/jira/browse/BEAM-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Larsen closed BEAM-8975. -- > Add Thrift Parser for ThriftIO > -- > > Key: BEAM-8975 > URL: https://issues.apache.org/jira/browse/BEAM-8975 > Project: Beam > Issue Type: New Feature > Components: io-java-files >Reporter: Chris Larsen >Assignee: Chris Larsen >Priority: Minor > Fix For: Not applicable > > Time Spent: 1.5h > Remaining Estimate: 0h > > This ticket is related to > [BEAM-8561|https://issues.apache.org/jira/projects/BEAM/issues/BEAM-8561?filter=allissues]. > As there are a large number of files to review for the > [PR|https://github.com/apache/beam/pull/10290] for ThriftIO this ticket will > serve as the tracker for the submission of a PR relating to the parser and > document model that will be used by ThriftIO. The aim is to reduce the number > of files submitted with each PR. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-8975) Add Thrift Parser for ThriftIO
[ https://issues.apache.org/jira/browse/BEAM-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Larsen resolved BEAM-8975. Fix Version/s: Not applicable Resolution: Won't Do Doesn't need to be implemented as ThriftIO was redesigned to handle Thrift encoded files instead of Thrift IDL files. > Add Thrift Parser for ThriftIO > -- > > Key: BEAM-8975 > URL: https://issues.apache.org/jira/browse/BEAM-8975 > Project: Beam > Issue Type: New Feature > Components: io-java-files >Reporter: Chris Larsen >Assignee: Chris Larsen >Priority: Minor > Fix For: Not applicable > > Time Spent: 1.5h > Remaining Estimate: 0h > > This ticket is related to > [BEAM-8561|https://issues.apache.org/jira/projects/BEAM/issues/BEAM-8561?filter=allissues]. > As there are a large number of files to review for the > [PR|https://github.com/apache/beam/pull/10290] for ThriftIO this ticket will > serve as the tracker for the submission of a PR relating to the parser and > document model that will be used by ThriftIO. The aim is to reduce the number > of files submitted with each PR. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (BEAM-8494) Python 3.8 Support
[ https://issues.apache.org/jira/browse/BEAM-8494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046018#comment-17046018 ] Valentyn Tymofieiev edited comment on BEAM-8494 at 2/27/20 1:02 AM: I tried to install Beam on Py 3.8 and run unit tests. Findings so far: - We need to relax some dependencies (fastavro, pyarrow, pandas), otherwise wheels fail to build on my platform. - For some reason we skip typing_extensions on Python 3.8 [1], and that causes a large number of tests to fail, which otherwise pass if we install this dependency. [1] https://github.com/apache/beam/blob/4a25aa0dcaf19184ac279f566917132f5ae2be9d/sdks/python/setup.py#L172 was (Author: tvalentyn): I tried to install Beam on Py 3.8 and run unit tests. Findings so far: - We need to relax some dependencies (fastavro, pyarrow, pandas), otherwise wheels fail to build on my platform. - It seems like we need typing_extensions for tests to pass. For some reason we skip it on Python 3.8 [1]. [1] https://github.com/apache/beam/blob/4a25aa0dcaf19184ac279f566917132f5ae2be9d/sdks/python/setup.py#L172 > Python 3.8 Support > -- > > Key: BEAM-8494 > URL: https://issues.apache.org/jira/browse/BEAM-8494 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Valentyn Tymofieiev >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8494) Python 3.8 Support
[ https://issues.apache.org/jira/browse/BEAM-8494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046021#comment-17046021 ] Valentyn Tymofieiev commented on BEAM-8494: --- Looks like we'd have to chase down some failing tests. -- Ran 3172 tests in 863.510s FAILED (SKIP=309, errors=31, failures=1) > Python 3.8 Support > -- > > Key: BEAM-8494 > URL: https://issues.apache.org/jira/browse/BEAM-8494 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Valentyn Tymofieiev >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (BEAM-8494) Python 3.8 Support
[ https://issues.apache.org/jira/browse/BEAM-8494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046021#comment-17046021 ] Valentyn Tymofieiev edited comment on BEAM-8494 at 2/27/20 1:02 AM: Looks like we'd still have to chase down more failing tests. -- Ran 3172 tests in 863.510s FAILED (SKIP=309, errors=31, failures=1) was (Author: tvalentyn): Looks like we'd have to chase down some failing tests. -- Ran 3172 tests in 863.510s FAILED (SKIP=309, errors=31, failures=1) > Python 3.8 Support > -- > > Key: BEAM-8494 > URL: https://issues.apache.org/jira/browse/BEAM-8494 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Valentyn Tymofieiev >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8494) Python 3.8 Support
[ https://issues.apache.org/jira/browse/BEAM-8494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046018#comment-17046018 ] Valentyn Tymofieiev commented on BEAM-8494: --- I tried to install Beam on Py 3.8 and run unit tests. Findings so far: - We need to relax some dependencies (fastavro, pyarrow, pandas), otherwise wheels fail to build on my platform. - It seems like we need typing_extensions for tests to pass. For some reason we skip it on Python 3.8 [1]. [1] https://github.com/apache/beam/blob/4a25aa0dcaf19184ac279f566917132f5ae2be9d/sdks/python/setup.py#L172 > Python 3.8 Support > -- > > Key: BEAM-8494 > URL: https://issues.apache.org/jira/browse/BEAM-8494 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Valentyn Tymofieiev >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9056) Staging artifacts from environment
[ https://issues.apache.org/jira/browse/BEAM-9056?focusedWorklogId=393879=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393879 ] ASF GitHub Bot logged work on BEAM-9056: Author: ASF GitHub Bot Created on: 27/Feb/20 00:47 Start Date: 27/Feb/20 00:47 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10621: [BEAM-9056] Staging artifacts from environment URL: https://github.com/apache/beam/pull/10621#discussion_r384853099 ## File path: sdks/python/apache_beam/runners/portability/stager.py ## @@ -377,34 +436,33 @@ def _stage_jar_packages(self, jar_packages, staging_location, temp_dir): for package in local_packages: basename = os.path.basename(package) - staged_path = FileSystems.join(staging_location, basename) - self.stage_artifact(package, staged_path) - resources.append(basename) + resources.append((package, basename)) return resources - def _stage_extra_packages(self, extra_packages, staging_location, temp_dir): -# type: (...) -> List[str] + @staticmethod + def _stage_extra_packages(extra_packages, temp_dir): Review comment: Similarly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393879) Time Spent: 3h 20m (was: 3h 10m) > Staging artifacts from environment > -- > > Key: BEAM-9056 > URL: https://issues.apache.org/jira/browse/BEAM-9056 > Project: Beam > Issue Type: Sub-task > Components: java-fn-execution >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > staging artifacts from artifact information embedded in environment proto. > detail: > https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9056) Staging artifacts from environment
[ https://issues.apache.org/jira/browse/BEAM-9056?focusedWorklogId=393875=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393875 ] ASF GitHub Bot logged work on BEAM-9056: Author: ASF GitHub Bot Created on: 27/Feb/20 00:47 Start Date: 27/Feb/20 00:47 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10621: [BEAM-9056] Staging artifacts from environment URL: https://github.com/apache/beam/pull/10621#discussion_r384853067 ## File path: sdks/python/apache_beam/runners/portability/stager.py ## @@ -331,22 +389,23 @@ def _download_file(from_url, to_path): def _is_remote_path(path): return path.find('://') != -1 - def _stage_jar_packages(self, jar_packages, staging_location, temp_dir): -# type: (...) -> List[str] + @staticmethod + def _stage_jar_packages(jar_packages, temp_dir): Review comment: Does this actually stage them, or just add them to the returned list? If not, the name and docstring should be updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393875) Time Spent: 3h 10m (was: 3h) > Staging artifacts from environment > -- > > Key: BEAM-9056 > URL: https://issues.apache.org/jira/browse/BEAM-9056 > Project: Beam > Issue Type: Sub-task > Components: java-fn-execution >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > staging artifacts from artifact information embedded in environment proto. > detail: > https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9056) Staging artifacts from environment
[ https://issues.apache.org/jira/browse/BEAM-9056?focusedWorklogId=393876=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393876 ] ASF GitHub Bot logged work on BEAM-9056: Author: ASF GitHub Bot Created on: 27/Feb/20 00:47 Start Date: 27/Feb/20 00:47 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10621: [BEAM-9056] Staging artifacts from environment URL: https://github.com/apache/beam/pull/10621#discussion_r384850669 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java ## @@ -88,11 +88,13 @@ public static SdkComponents create(RunnerApi.Components components) { public static SdkComponents create(PipelineOptions options) { SdkComponents sdkComponents = new SdkComponents(RunnerApi.Components.getDefaultInstance(), ""); PortablePipelineOptions portablePipelineOptions = options.as(PortablePipelineOptions.class); -sdkComponents.defaultEnvironmentId = -sdkComponents.registerEnvironment( -Environments.createOrGetDefaultEnvironment( +sdkComponents.registerEnvironment( +Environments.createOrGetDefaultEnvironment( portablePipelineOptions.getDefaultEnvironmentType(), -portablePipelineOptions.getDefaultEnvironmentConfig())); +portablePipelineOptions.getDefaultEnvironmentConfig()) +.toBuilder() +.addAllDependencies(Environments.getArtifacts(options)) Review comment: Would this be better placed in createOrGetDefaultEnvironment? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393876) Time Spent: 3h 10m (was: 3h) > Staging artifacts from environment > -- > > Key: BEAM-9056 > URL: https://issues.apache.org/jira/browse/BEAM-9056 > Project: Beam > Issue Type: Sub-task > Components: java-fn-execution >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > staging artifacts from artifact information embedded in environment proto. > detail: > https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9056) Staging artifacts from environment
[ https://issues.apache.org/jira/browse/BEAM-9056?focusedWorklogId=393881=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393881 ] ASF GitHub Bot logged work on BEAM-9056: Author: ASF GitHub Bot Created on: 27/Feb/20 00:47 Start Date: 27/Feb/20 00:47 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10621: [BEAM-9056] Staging artifacts from environment URL: https://github.com/apache/beam/pull/10621#discussion_r384851171 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java ## @@ -261,14 +263,20 @@ public String registerCoder(Coder coder) throws IOException { * return the same unique ID. */ public String registerEnvironment(Environment env) { +String environmentId; String existing = environmentIds.get(env); if (existing != null) { - return existing; + environmentId = existing; +} else { + String name = uniqify(env.getUrn(), environmentIds.values()); + environmentIds.put(env, name); + componentsBuilder.putEnvironments(name, env); + environmentId = name; } -String name = uniqify(env.getUrn(), environmentIds.values()); -environmentIds.put(env, name); -componentsBuilder.putEnvironments(name, env); -return name; +if (defaultEnvironmentId == null) { Review comment: +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393881) Time Spent: 3.5h (was: 3h 20m) > Staging artifacts from environment > -- > > Key: BEAM-9056 > URL: https://issues.apache.org/jira/browse/BEAM-9056 > Project: Beam > Issue Type: Sub-task > Components: java-fn-execution >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > staging artifacts from artifact information embedded in environment proto. > detail: > https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9056) Staging artifacts from environment
[ https://issues.apache.org/jira/browse/BEAM-9056?focusedWorklogId=393877=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393877 ] ASF GitHub Bot logged work on BEAM-9056: Author: ASF GitHub Bot Created on: 27/Feb/20 00:47 Start Date: 27/Feb/20 00:47 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10621: [BEAM-9056] Staging artifacts from environment URL: https://github.com/apache/beam/pull/10621#discussion_r384853321 ## File path: sdks/python/apache_beam/runners/portability/stager.py ## @@ -547,21 +601,22 @@ def _desired_sdk_filename_in_staging_location(sdk_location): else: return DATAFLOW_SDK_TARBALL_FILE - def _stage_beam_sdk(self, sdk_remote_location, staging_location, temp_dir): -# type: (...) -> List[str] + @staticmethod + def _stage_beam_sdk(sdk_remote_location, temp_dir): Review comment: same This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393877) Time Spent: 3h 20m (was: 3h 10m) > Staging artifacts from environment > -- > > Key: BEAM-9056 > URL: https://issues.apache.org/jira/browse/BEAM-9056 > Project: Beam > Issue Type: Sub-task > Components: java-fn-execution >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > staging artifacts from artifact information embedded in environment proto. > detail: > https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9056) Staging artifacts from environment
[ https://issues.apache.org/jira/browse/BEAM-9056?focusedWorklogId=393880=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393880 ] ASF GitHub Bot logged work on BEAM-9056: Author: ASF GitHub Bot Created on: 27/Feb/20 00:47 Start Date: 27/Feb/20 00:47 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10621: [BEAM-9056] Staging artifacts from environment URL: https://github.com/apache/beam/pull/10621#discussion_r384851813 ## File path: runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java ## @@ -200,11 +143,44 @@ public PipelineResult run(Pipeline pipeline) { prepareJobResponse.getArtifactStagingEndpoint(); String stagingSessionToken = prepareJobResponse.getStagingSessionToken(); + ImmutableList.Builder filesToStageBuilder = ImmutableList.builder(); Review comment: Perhaps it's worth pulling this out into a separate method? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393880) > Staging artifacts from environment > -- > > Key: BEAM-9056 > URL: https://issues.apache.org/jira/browse/BEAM-9056 > Project: Beam > Issue Type: Sub-task > Components: java-fn-execution >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > staging artifacts from artifact information embedded in environment proto. > detail: > https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9056) Staging artifacts from environment
[ https://issues.apache.org/jira/browse/BEAM-9056?focusedWorklogId=393878=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393878 ] ASF GitHub Bot logged work on BEAM-9056: Author: ASF GitHub Bot Created on: 27/Feb/20 00:47 Start Date: 27/Feb/20 00:47 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10621: [BEAM-9056] Staging artifacts from environment URL: https://github.com/apache/beam/pull/10621#discussion_r384849942 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java ## @@ -175,6 +197,90 @@ public static Environment createProcessEnvironment( } } + public static Collection getArtifacts(PipelineOptions options) { +Set pathsToStage = Sets.newHashSet(); +// TODO(heejong): remove jar_packages experimental flag when cross-language dependency +// management is implemented for all runners. +List experiments = options.as(ExperimentalOptions.class).getExperiments(); +if (experiments != null) { + Optional jarPackages = + experiments.stream() + .filter((String flag) -> flag.startsWith("jar_packages=")) + .findFirst(); + jarPackages.ifPresent( + s -> pathsToStage.addAll(Arrays.asList(s.replaceFirst("jar_packages=", "").split(","; +} +List stagingFiles = options.as(PortablePipelineOptions.class).getFilesToStage(); +if (stagingFiles == null) { + pathsToStage.addAll( + detectClassPathResourcesToStage(Environments.class.getClassLoader(), options)); + if (pathsToStage.isEmpty()) { +throw new IllegalArgumentException("No classpath elements found."); + } + LOG.debug( + "PortablePipelineOptions.filesToStage was not specified. " + + "Defaulting to files from the classpath: {}", + pathsToStage.size()); +} else { + pathsToStage.addAll(stagingFiles); +} + +ImmutableList.Builder filesToStage = ImmutableList.builder(); +for (String path : pathsToStage) { + File file = new File(path); + if (new File(path).exists()) { +// Spurious items get added to the classpath. Filter by just those that exist. +if (file.isDirectory()) { + // Zip up directories so we can upload them to the artifact service. Review comment: Looks like this was the previous behavior. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393878) Time Spent: 3h 20m (was: 3h 10m) > Staging artifacts from environment > -- > > Key: BEAM-9056 > URL: https://issues.apache.org/jira/browse/BEAM-9056 > Project: Beam > Issue Type: Sub-task > Components: java-fn-execution >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > staging artifacts from artifact information embedded in environment proto. > detail: > https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7746) Add type hints to python code
[ https://issues.apache.org/jira/browse/BEAM-7746?focusedWorklogId=393873=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393873 ] ASF GitHub Bot logged work on BEAM-7746: Author: ASF GitHub Bot Created on: 27/Feb/20 00:37 Start Date: 27/Feb/20 00:37 Worklog Time Spent: 10m Work Description: chadrik commented on issue #10822: [BEAM-7746] Minor typing updates / fixes URL: https://github.com/apache/beam/pull/10822#issuecomment-591716771 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393873) Time Spent: 67.5h (was: 67h 20m) > Add type hints to python code > - > > Key: BEAM-7746 > URL: https://issues.apache.org/jira/browse/BEAM-7746 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chad Dombrova >Assignee: Chad Dombrova >Priority: Major > Time Spent: 67.5h > Remaining Estimate: 0h > > As a developer of the beam source code, I would like the code to use pep484 > type hints so that I can clearly see what types are required, get completion > in my IDE, and enforce code correctness via a static analyzer like mypy. > This may be considered a precursor to BEAM-7060 > Work has been started here: [https://github.com/apache/beam/pull/9056] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8618) Tear down unused DoFns periodically in Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-8618?focusedWorklogId=393870=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393870 ] ASF GitHub Bot logged work on BEAM-8618: Author: ASF GitHub Bot Created on: 27/Feb/20 00:28 Start Date: 27/Feb/20 00:28 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10655: [BEAM-8618] Tear down unused DoFns periodically in Python SDK harness. URL: https://github.com/apache/beam/pull/10655 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393870) Time Spent: 5.5h (was: 5h 20m) > Tear down unused DoFns periodically in Python SDK harness > - > > Key: BEAM-8618 > URL: https://issues.apache.org/jira/browse/BEAM-8618 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Time Spent: 5.5h > Remaining Estimate: 0h > > Per the discussion in the ML, detail can be found [1], the teardown of DoFns > should be supported in the portability framework. It happens at two places: > 1) Upon the control service termination > 2) Tear down the unused DoFns periodically > The aim of this JIRA is to add support for tear down the unused DoFns > periodically in Python SDK harness. > [1] > https://lists.apache.org/thread.html/0c4a4cf83cf2e35c3dfeb9d906e26cd82d3820968ba6f862f91739e4@%3Cdev.beam.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393868=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393868 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 27/Feb/20 00:27 Start Date: 27/Feb/20 00:27 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#discussion_r384840768 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ## @@ -185,6 +200,37 @@ private Unbounded(@Nullable String name, UnboundedSource source) { @Override public final PCollection expand(PBegin input) { source.validate(); + + if (ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api") + && !ExperimentalOptions.hasExperiment( + input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")) { +// We don't use Create here since Create is defined as a BoundedSource and using it would +// cause an infinite expansion loop. We can reconsider this if Create is implemented +// directly as a SplittableDoFn. +PCollection> outputWithIds = +input +.getPipeline() +.apply(Impulse.create()) +.apply( +MapElements.into(new TypeDescriptor>() {}) +.via(element -> (UnboundedSource) source)) +.setCoder( +SerializableCoder.of( +new TypeDescriptor>() {})) +.apply( +ParDo.of( +new UnboundedSourceAsSDFWrapperFn<>( +(Coder) source.getCheckpointMarkCoder( +.setCoder(ValueWithRecordIdCoder.of(source.getOutputCoder())); +if (source.requiresDeduping()) { + outputWithIds.apply( + Distinct., byte[]>withRepresentativeValueFn( Review comment: I'm curious what `WindowingStrategy` is going to apply here. If it's using `GlobalWindow`, will it work in streaming mode? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393868) Time Spent: 21h (was: 20h 50m) > Fn API SDF support > -- > > Key: BEAM-2939 > URL: https://issues.apache.org/jira/browse/BEAM-2939 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Henning Rohde >Assignee: Luke Cwik >Priority: Major > Labels: portability > Time Spent: 21h > Remaining Estimate: 0h > > The Fn API should support streaming SDF. Detailed design TBD. > Once design is ready, expand subtasks similarly to BEAM-2822. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393869=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393869 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 27/Feb/20 00:27 Start Date: 27/Feb/20 00:27 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#discussion_r384846699 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ## @@ -375,4 +422,361 @@ public void checkDone() throws IllegalStateException { } } } + + /** + * A splittable {@link DoFn} which executes an {@link UnboundedSource}. + * + * We model the element as the original source and the restriction as a pair of the sub-source + * and its {@link CheckpointMark}. This allows us to split the sub-source over and over as long as + * the checkpoint mark is {@code null} or the {@link NoopCheckpointMark} since it does not + * maintain any state. + */ + // TODO: Support reporting the watermark, currently the watermark never advances. + @UnboundedPerElement + static class UnboundedSourceAsSDFWrapperFn + extends DoFn, ValueWithRecordId> { + +private static final int DEFAULT_DESIRED_NUM_SPLITS = 20; +private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10; +private final Coder restrictionCoder; + +private UnboundedSourceAsSDFWrapperFn(Coder restrictionCoder) { + this.restrictionCoder = restrictionCoder; +} + +@GetInitialRestriction +public KV, CheckpointT> initialRestriction( +@Element UnboundedSource element) { + return KV.of(element, null); +} + +@GetSize +public double getSize( +@Restriction KV, CheckpointT> restriction, +PipelineOptions pipelineOptions) +throws Exception { + if (restriction.getKey() instanceof EmptyUnboundedSource) { +return 1; + } + + UnboundedReader reader = + restriction.getKey().createReader(pipelineOptions, restriction.getValue()); + long size = reader.getSplitBacklogBytes(); + if (size != UnboundedReader.BACKLOG_UNKNOWN) { +return size; + } + // TODO: Support "global" backlog reporting + // size = reader.getTotalBacklogBytes(); + // if (size != UnboundedReader.BACKLOG_UNKNOWN) { + // return size; + // } + return 1; +} + +@SplitRestriction +public void splitRestriction( +@Restriction KV, CheckpointT> restriction, +OutputReceiver, CheckpointT>> receiver, +PipelineOptions pipelineOptions) +throws Exception { + // The empty unbounded source is trivially done and hence we don't need to output any splits + // for it. + if (restriction.getKey() instanceof EmptyUnboundedSource) { +return; + } + + // The UnboundedSource API does not support splitting after a meaningful checkpoint mark has + // been created. + if (restriction.getValue() != null + && !(restriction.getValue() + instanceof UnboundedSource.CheckpointMark.NoopCheckpointMark)) { +receiver.output(restriction); + } + + try { +for (UnboundedSource split : +restriction.getKey().split(DEFAULT_DESIRED_NUM_SPLITS, pipelineOptions)) { + receiver.output(KV.of(split, null)); +} + } catch (Exception e) { +receiver.output(restriction); + } +} + +@NewTracker +public RestrictionTracker< +KV, CheckpointT>, UnboundedSourceValue[]> +restrictionTracker( +@Restriction KV, CheckpointT> restriction, +PipelineOptions pipelineOptions) { + return new UnboundedSourceAsSDFRestrictionTracker(restriction, pipelineOptions); +} + +@ProcessElement +public ProcessContinuation processElement( +RestrictionTracker< +KV, CheckpointT>, UnboundedSourceValue[]> +tracker, +OutputReceiver> receiver, +BundleFinalizer bundleFinalizer) +throws IOException { + UnboundedSourceValue[] out = new UnboundedSourceValue[1]; + while (tracker.tryClaim(out)) { +receiver.outputWithTimestamp( +new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), out[0].getTimestamp()); + } + + // Add the checkpoint mark to be finalized if the checkpoint mark isn't trivial. + KV, CheckpointT> currentRestriction = + tracker.currentRestriction(); + if (currentRestriction.getValue() != null + && !(tracker.currentRestriction().getValue() instanceof NoopCheckpointMark)) { +bundleFinalizer.afterBundleCommit( +
[jira] [Work logged] (BEAM-8575) Add more Python validates runner tests
[ https://issues.apache.org/jira/browse/BEAM-8575?focusedWorklogId=393867=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393867 ] ASF GitHub Bot logged work on BEAM-8575: Author: ASF GitHub Bot Created on: 27/Feb/20 00:26 Start Date: 27/Feb/20 00:26 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10951: [BEAM-8575] Modified the test to work for different runners. URL: https://github.com/apache/beam/pull/10951#discussion_r384848033 ## File path: sdks/python/apache_beam/transforms/combiners_test.py ## @@ -470,6 +470,9 @@ def test_combining_with_accumulation_mode_and_fanout(self): ts.add_elements([i]) ts.advance_watermark_to_infinity() +def is_early_firing(element, num_partitions): + return 0 if element < 15 else 1 Review comment: This was fixed last year. Looks like the bug was never closed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393867) Time Spent: 57h 20m (was: 57h 10m) > Add more Python validates runner tests > -- > > Key: BEAM-8575 > URL: https://issues.apache.org/jira/browse/BEAM-8575 > Project: Beam > Issue Type: Test > Components: sdk-py-core, testing >Reporter: wendy liu >Assignee: wendy liu >Priority: Major > Time Spent: 57h 20m > Remaining Estimate: 0h > > This is the umbrella issue to track the work of adding more Python tests to > improve test coverage. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-8435) Allow access to PaneInfo from Python DoFns
[ https://issues.apache.org/jira/browse/BEAM-8435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Bradshaw resolved BEAM-8435. --- Fix Version/s: 2.19.0 Resolution: Duplicate > Allow access to PaneInfo from Python DoFns > -- > > Key: BEAM-8435 > URL: https://issues.apache.org/jira/browse/BEAM-8435 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Robert Bradshaw >Assignee: Robert Bradshaw >Priority: Major > Fix For: 2.19.0 > > Time Spent: 2h > Remaining Estimate: 0h > > PaneInfoParam exists, but the plumbing to actually populate it at runtime was > never added. (Nor, clearly, were any tests...) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-3759) Add support for PaneInfo descriptor in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Bradshaw resolved BEAM-3759. --- Fix Version/s: 2.19.0 Resolution: Fixed > Add support for PaneInfo descriptor in Python SDK > - > > Key: BEAM-3759 > URL: https://issues.apache.org/jira/browse/BEAM-3759 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Affects Versions: 2.3.0 >Reporter: Charles Chen >Assignee: Tanay Tummalapalli >Priority: Major > Fix For: 2.19.0 > > Time Spent: 3h 40m > Remaining Estimate: 0h > > The PaneInfo descriptor allows a user to determine which particular > triggering emitted a value. This allows the user to differentiate between > speculative (early), on-time (at end of window) and late value emissions > coming out of a GroupByKey. We should add support for this feature in the > Python SDK. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8575) Add more Python validates runner tests
[ https://issues.apache.org/jira/browse/BEAM-8575?focusedWorklogId=393865=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393865 ] ASF GitHub Bot logged work on BEAM-8575: Author: ASF GitHub Bot Created on: 27/Feb/20 00:18 Start Date: 27/Feb/20 00:18 Worklog Time Spent: 10m Work Description: bumblebee-coming commented on pull request #10951: [BEAM-8575] Modified the test to work for different runners. URL: https://github.com/apache/beam/pull/10951#discussion_r384845501 ## File path: sdks/python/apache_beam/transforms/combiners_test.py ## @@ -470,6 +470,9 @@ def test_combining_with_accumulation_mode_and_fanout(self): ts.add_elements([i]) ts.advance_watermark_to_infinity() +def is_early_firing(element, num_partitions): + return 0 if element < 15 else 1 Review comment: PaneInfo is not supported yet in Python. https://issues.apache.org/jira/browse/BEAM-3759 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393865) Time Spent: 57h 10m (was: 57h) > Add more Python validates runner tests > -- > > Key: BEAM-8575 > URL: https://issues.apache.org/jira/browse/BEAM-8575 > Project: Beam > Issue Type: Test > Components: sdk-py-core, testing >Reporter: wendy liu >Assignee: wendy liu >Priority: Major > Time Spent: 57h 10m > Remaining Estimate: 0h > > This is the umbrella issue to track the work of adding more Python tests to > improve test coverage. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8841) Add ability to perform BigQuery file loads using avro
[ https://issues.apache.org/jira/browse/BEAM-8841?focusedWorklogId=393864=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393864 ] ASF GitHub Bot logged work on BEAM-8841: Author: ASF GitHub Bot Created on: 27/Feb/20 00:14 Start Date: 27/Feb/20 00:14 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10979: [BEAM-8841] Support writing data to BigQuery via Avro in Python SDK URL: https://github.com/apache/beam/pull/10979#issuecomment-591710321 Run Python 3.7 PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393864) Time Spent: 1h 40m (was: 1.5h) > Add ability to perform BigQuery file loads using avro > - > > Key: BEAM-8841 > URL: https://issues.apache.org/jira/browse/BEAM-8841 > Project: Beam > Issue Type: Improvement > Components: io-py-gcp >Reporter: Chun Yang >Assignee: Chun Yang >Priority: Minor > Time Spent: 1h 40m > Remaining Estimate: 0h > > Currently, JSON format is used for file loads into BigQuery in the Python > SDK. JSON has some disadvantages including size of serialized data and > inability to represent NaN and infinity float values. > BigQuery supports loading files in avro format, which can overcome these > disadvantages. The Java SDK already supports loading files using avro format > (BEAM-2879) so it makes sense to support it in the Python SDK as well. > The change will be somewhere around > [{{BigQueryBatchFileLoads}}|https://github.com/apache/beam/blob/3e7865ee6c6a56e51199515ec5b4b16de1ddd166/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L554]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8841) Add ability to perform BigQuery file loads using avro
[ https://issues.apache.org/jira/browse/BEAM-8841?focusedWorklogId=393860=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393860 ] ASF GitHub Bot logged work on BEAM-8841: Author: ASF GitHub Bot Created on: 27/Feb/20 00:04 Start Date: 27/Feb/20 00:04 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10979: [BEAM-8841] Support writing data to BigQuery via Avro in Python SDK URL: https://github.com/apache/beam/pull/10979#issuecomment-591707480 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393860) Time Spent: 1.5h (was: 1h 20m) > Add ability to perform BigQuery file loads using avro > - > > Key: BEAM-8841 > URL: https://issues.apache.org/jira/browse/BEAM-8841 > Project: Beam > Issue Type: Improvement > Components: io-py-gcp >Reporter: Chun Yang >Assignee: Chun Yang >Priority: Minor > Time Spent: 1.5h > Remaining Estimate: 0h > > Currently, JSON format is used for file loads into BigQuery in the Python > SDK. JSON has some disadvantages including size of serialized data and > inability to represent NaN and infinity float values. > BigQuery supports loading files in avro format, which can overcome these > disadvantages. The Java SDK already supports loading files using avro format > (BEAM-2879) so it makes sense to support it in the Python SDK as well. > The change will be somewhere around > [{{BigQueryBatchFileLoads}}|https://github.com/apache/beam/blob/3e7865ee6c6a56e51199515ec5b4b16de1ddd166/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L554]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9381) Add display data to BoundedSourceSDF
[ https://issues.apache.org/jira/browse/BEAM-9381?focusedWorklogId=393859=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393859 ] ASF GitHub Bot logged work on BEAM-9381: Author: ASF GitHub Bot Created on: 27/Feb/20 00:02 Start Date: 27/Feb/20 00:02 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10968: [BEAM-9381] Adding display data to BoundedSource SDF URL: https://github.com/apache/beam/pull/10968#issuecomment-591706858 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393859) Time Spent: 1h 20m (was: 1h 10m) > Add display data to BoundedSourceSDF > - > > Key: BEAM-9381 > URL: https://issues.apache.org/jira/browse/BEAM-9381 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7746) Add type hints to python code
[ https://issues.apache.org/jira/browse/BEAM-7746?focusedWorklogId=393852=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393852 ] ASF GitHub Bot logged work on BEAM-7746: Author: ASF GitHub Bot Created on: 26/Feb/20 23:48 Start Date: 26/Feb/20 23:48 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #10822: [BEAM-7746] Minor typing updates / fixes URL: https://github.com/apache/beam/pull/10822#discussion_r384836306 ## File path: sdks/python/apache_beam/transforms/external_java.py ## @@ -37,18 +39,19 @@ # Protect against environments where apitools library is not available. # pylint: disable=wrong-import-order, wrong-import-position +apiclient = None # type: Optional[types.ModuleType] Review comment: I did some more research on this, and I found this mypy issue: https://github.com/python/mypy/issues/1297 It suggests this idiom: ```python try: from apache_beam.runners.dataflow.internal import apiclient as _apiclient except ImportError: apiclient = None else: apiclient = _apiclient ``` The import is a bit longer and uglier, but it has 2 advantages: - no need to import `Optional` or `ModuleType` - the idiom I was using was actually making `apiclient` a generic ModuleType, dropping all knowledge of the members of `apache_beam.runners.dataflow.internal`. That's bad! The reason this works without explicit `Optional` annotation that mypy will automatically determine optionality in some cases, like this: ```python if some_conditional(): x = None else: x = 1 reveal_type(x) # Revealed type is 'Union[builtins.int, None]' ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393852) Time Spent: 67h 20m (was: 67h 10m) > Add type hints to python code > - > > Key: BEAM-7746 > URL: https://issues.apache.org/jira/browse/BEAM-7746 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chad Dombrova >Assignee: Chad Dombrova >Priority: Major > Time Spent: 67h 20m > Remaining Estimate: 0h > > As a developer of the beam source code, I would like the code to use pep484 > type hints so that I can clearly see what types are required, get completion > in my IDE, and enforce code correctness via a static analyzer like mypy. > This may be considered a precursor to BEAM-7060 > Work has been started here: [https://github.com/apache/beam/pull/9056] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8965) WriteToBigQuery failed in BundleBasedDirectRunner
[ https://issues.apache.org/jira/browse/BEAM-8965?focusedWorklogId=393842=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393842 ] ASF GitHub Bot logged work on BEAM-8965: Author: ASF GitHub Bot Created on: 26/Feb/20 23:36 Start Date: 26/Feb/20 23:36 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #10901: [BEAM-8965] Remove duplicate sideinputs in ConsumerTrackingPipelineVisitor URL: https://github.com/apache/beam/pull/10901 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393842) Time Spent: 2h 20m (was: 2h 10m) > WriteToBigQuery failed in BundleBasedDirectRunner > - > > Key: BEAM-8965 > URL: https://issues.apache.org/jira/browse/BEAM-8965 > Project: Beam > Issue Type: Bug > Components: io-py-gcp >Affects Versions: 2.16.0, 2.17.0, 2.18.0, 2.19.0 >Reporter: Wenbing Bai >Assignee: Wenbing Bai >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > *{{WriteToBigQuery}}* fails in *{{BundleBasedDirectRunner}}* with error > {{PCollection of size 2 with more than one element accessed as a singleton > view.}} > Here is the code > > {code:python} > with Pipeline() as p: > query_results = ( > p > | beam.io.Read(beam.io.BigQuerySource( > query='SELECT ... FROM ...') > ) > query_results | beam.io.gcp.WriteToBigQuery( > table=, > method=WriteToBigQuery.Method.FILE_LOADS, > schema={"fields": []} > ) > {code} > > Here is the error > > {code:none} > File "apache_beam/runners/common.py", line 778, in > apache_beam.runners.common.DoFnRunner.process > def process(self, windowed_value): > File "apache_beam/runners/common.py", line 782, in > apache_beam.runners.common.DoFnRunner.process > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 849, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > raise_with_traceback(new_exn) > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > return self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > self._invoke_process_per_window( > File "apache_beam/runners/common.py", line 610, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > [si[global_window] for si in self.side_inputs])) > File > "/home/wbai/terra/terra_py2/local/lib/python2.7/site-packages/apache_beam/transforms/sideinputs.py", > line 65, in __getitem__ > _FilteringIterable(self._iterable, target_window), self._view_options) > File > "/home/wbai/terra/terra_py2/local/lib/python2.7/site-packages/apache_beam/pvalue.py", > line 443, in _from_runtime_iterable > len(head), str(head[0]), str(head[1]))) > ValueError: PCollection of size 2 with more than one element accessed as a > singleton view. First two elements encountered are > "gs://temp-dev/temp/bq_load/3edbf2172dd540edb5c8e9597206b10f", > "gs://temp-dev/temp/bq_load/3edbf2172dd540edb5c8e9597206b10f". [while running > 'WriteToBigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)'] > {code} > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8965) WriteToBigQuery failed in BundleBasedDirectRunner
[ https://issues.apache.org/jira/browse/BEAM-8965?focusedWorklogId=393843=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393843 ] ASF GitHub Bot logged work on BEAM-8965: Author: ASF GitHub Bot Created on: 26/Feb/20 23:36 Start Date: 26/Feb/20 23:36 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10901: [BEAM-8965] Remove duplicate sideinputs in ConsumerTrackingPipelineVisitor URL: https://github.com/apache/beam/pull/10901#issuecomment-591699695 thanks @bobingm This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393843) Time Spent: 2.5h (was: 2h 20m) > WriteToBigQuery failed in BundleBasedDirectRunner > - > > Key: BEAM-8965 > URL: https://issues.apache.org/jira/browse/BEAM-8965 > Project: Beam > Issue Type: Bug > Components: io-py-gcp >Affects Versions: 2.16.0, 2.17.0, 2.18.0, 2.19.0 >Reporter: Wenbing Bai >Assignee: Wenbing Bai >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > *{{WriteToBigQuery}}* fails in *{{BundleBasedDirectRunner}}* with error > {{PCollection of size 2 with more than one element accessed as a singleton > view.}} > Here is the code > > {code:python} > with Pipeline() as p: > query_results = ( > p > | beam.io.Read(beam.io.BigQuerySource( > query='SELECT ... FROM ...') > ) > query_results | beam.io.gcp.WriteToBigQuery( > table=, > method=WriteToBigQuery.Method.FILE_LOADS, > schema={"fields": []} > ) > {code} > > Here is the error > > {code:none} > File "apache_beam/runners/common.py", line 778, in > apache_beam.runners.common.DoFnRunner.process > def process(self, windowed_value): > File "apache_beam/runners/common.py", line 782, in > apache_beam.runners.common.DoFnRunner.process > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 849, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > raise_with_traceback(new_exn) > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > return self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > self._invoke_process_per_window( > File "apache_beam/runners/common.py", line 610, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > [si[global_window] for si in self.side_inputs])) > File > "/home/wbai/terra/terra_py2/local/lib/python2.7/site-packages/apache_beam/transforms/sideinputs.py", > line 65, in __getitem__ > _FilteringIterable(self._iterable, target_window), self._view_options) > File > "/home/wbai/terra/terra_py2/local/lib/python2.7/site-packages/apache_beam/pvalue.py", > line 443, in _from_runtime_iterable > len(head), str(head[0]), str(head[1]))) > ValueError: PCollection of size 2 with more than one element accessed as a > singleton view. First two elements encountered are > "gs://temp-dev/temp/bq_load/3edbf2172dd540edb5c8e9597206b10f", > "gs://temp-dev/temp/bq_load/3edbf2172dd540edb5c8e9597206b10f". [while running > 'WriteToBigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)'] > {code} > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8965) WriteToBigQuery failed in BundleBasedDirectRunner
[ https://issues.apache.org/jira/browse/BEAM-8965?focusedWorklogId=393841=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393841 ] ASF GitHub Bot logged work on BEAM-8965: Author: ASF GitHub Bot Created on: 26/Feb/20 23:35 Start Date: 26/Feb/20 23:35 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10901: [BEAM-8965] Remove duplicate sideinputs in ConsumerTrackingPipelineVisitor URL: https://github.com/apache/beam/pull/10901#issuecomment-591699495 I see. Thanks for pointing that out. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393841) Time Spent: 2h 10m (was: 2h) > WriteToBigQuery failed in BundleBasedDirectRunner > - > > Key: BEAM-8965 > URL: https://issues.apache.org/jira/browse/BEAM-8965 > Project: Beam > Issue Type: Bug > Components: io-py-gcp >Affects Versions: 2.16.0, 2.17.0, 2.18.0, 2.19.0 >Reporter: Wenbing Bai >Assignee: Wenbing Bai >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > *{{WriteToBigQuery}}* fails in *{{BundleBasedDirectRunner}}* with error > {{PCollection of size 2 with more than one element accessed as a singleton > view.}} > Here is the code > > {code:python} > with Pipeline() as p: > query_results = ( > p > | beam.io.Read(beam.io.BigQuerySource( > query='SELECT ... FROM ...') > ) > query_results | beam.io.gcp.WriteToBigQuery( > table=, > method=WriteToBigQuery.Method.FILE_LOADS, > schema={"fields": []} > ) > {code} > > Here is the error > > {code:none} > File "apache_beam/runners/common.py", line 778, in > apache_beam.runners.common.DoFnRunner.process > def process(self, windowed_value): > File "apache_beam/runners/common.py", line 782, in > apache_beam.runners.common.DoFnRunner.process > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 849, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > raise_with_traceback(new_exn) > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > return self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > self._invoke_process_per_window( > File "apache_beam/runners/common.py", line 610, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > [si[global_window] for si in self.side_inputs])) > File > "/home/wbai/terra/terra_py2/local/lib/python2.7/site-packages/apache_beam/transforms/sideinputs.py", > line 65, in __getitem__ > _FilteringIterable(self._iterable, target_window), self._view_options) > File > "/home/wbai/terra/terra_py2/local/lib/python2.7/site-packages/apache_beam/pvalue.py", > line 443, in _from_runtime_iterable > len(head), str(head[0]), str(head[1]))) > ValueError: PCollection of size 2 with more than one element accessed as a > singleton view. First two elements encountered are > "gs://temp-dev/temp/bq_load/3edbf2172dd540edb5c8e9597206b10f", > "gs://temp-dev/temp/bq_load/3edbf2172dd540edb5c8e9597206b10f". [while running > 'WriteToBigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)'] > {code} > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393836=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393836 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 26/Feb/20 23:28 Start Date: 26/Feb/20 23:28 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#discussion_r384820354 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ## @@ -375,4 +422,361 @@ public void checkDone() throws IllegalStateException { } } } + + /** + * A splittable {@link DoFn} which executes an {@link UnboundedSource}. + * + * We model the element as the original source and the restriction as a pair of the sub-source + * and its {@link CheckpointMark}. This allows us to split the sub-source over and over as long as + * the checkpoint mark is {@code null} or the {@link NoopCheckpointMark} since it does not + * maintain any state. + */ + // TODO: Support reporting the watermark, currently the watermark never advances. + @UnboundedPerElement + static class UnboundedSourceAsSDFWrapperFn + extends DoFn, ValueWithRecordId> { + +private static final int DEFAULT_DESIRED_NUM_SPLITS = 20; +private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10; +private final Coder restrictionCoder; + +private UnboundedSourceAsSDFWrapperFn(Coder restrictionCoder) { + this.restrictionCoder = restrictionCoder; +} + +@GetInitialRestriction +public KV, CheckpointT> initialRestriction( +@Element UnboundedSource element) { + return KV.of(element, null); +} + +@GetSize +public double getSize( +@Restriction KV, CheckpointT> restriction, +PipelineOptions pipelineOptions) +throws Exception { + if (restriction.getKey() instanceof EmptyUnboundedSource) { +return 1; + } + + UnboundedReader reader = + restriction.getKey().createReader(pipelineOptions, restriction.getValue()); + long size = reader.getSplitBacklogBytes(); + if (size != UnboundedReader.BACKLOG_UNKNOWN) { +return size; + } + // TODO: Support "global" backlog reporting + // size = reader.getTotalBacklogBytes(); + // if (size != UnboundedReader.BACKLOG_UNKNOWN) { + // return size; + // } + return 1; +} + +@SplitRestriction +public void splitRestriction( +@Restriction KV, CheckpointT> restriction, +OutputReceiver, CheckpointT>> receiver, +PipelineOptions pipelineOptions) +throws Exception { + // The empty unbounded source is trivially done and hence we don't need to output any splits + // for it. + if (restriction.getKey() instanceof EmptyUnboundedSource) { +return; + } + + // The UnboundedSource API does not support splitting after a meaningful checkpoint mark has + // been created. + if (restriction.getValue() != null + && !(restriction.getValue() + instanceof UnboundedSource.CheckpointMark.NoopCheckpointMark)) { +receiver.output(restriction); + } + + try { +for (UnboundedSource split : +restriction.getKey().split(DEFAULT_DESIRED_NUM_SPLITS, pipelineOptions)) { + receiver.output(KV.of(split, null)); +} + } catch (Exception e) { +receiver.output(restriction); + } +} + +@NewTracker +public RestrictionTracker< +KV, CheckpointT>, UnboundedSourceValue[]> +restrictionTracker( +@Restriction KV, CheckpointT> restriction, +PipelineOptions pipelineOptions) { + return new UnboundedSourceAsSDFRestrictionTracker(restriction, pipelineOptions); +} + +@ProcessElement +public ProcessContinuation processElement( +RestrictionTracker< +KV, CheckpointT>, UnboundedSourceValue[]> +tracker, +OutputReceiver> receiver, +BundleFinalizer bundleFinalizer) +throws IOException { + UnboundedSourceValue[] out = new UnboundedSourceValue[1]; + while (tracker.tryClaim(out)) { +receiver.outputWithTimestamp( +new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), out[0].getTimestamp()); + } + + // Add the checkpoint mark to be finalized if the checkpoint mark isn't trivial. + KV, CheckpointT> currentRestriction = + tracker.currentRestriction(); + if (currentRestriction.getValue() != null + && !(tracker.currentRestriction().getValue() instanceof NoopCheckpointMark)) { +bundleFinalizer.afterBundleCommit( +
[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python
[ https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=393839=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393839 ] ASF GitHub Bot logged work on BEAM-3342: Author: ASF GitHub Bot Created on: 26/Feb/20 23:28 Start Date: 26/Feb/20 23:28 Worklog Time Spent: 10m Work Description: mf2199 commented on issue #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python URL: https://github.com/apache/beam/pull/8457#issuecomment-590610828 @chamikaramj PTAL. The long standing issue now seems to be resolved. The reason was the namespace conflict between the existing `bigtableio.py` file and the extra package necessary for running the integration test. As it turns out, the two must have different names, otherwise Dataflow discards the package and uses the existing file, which of course does not have the newly added classes and hence the `AttributeError: 'module' object has no attribute...` error. Apparently this was not the case when this PR was originally created, so now we also have a caveat: - Until the code is merged, the only way to run the test is to change the package name in the import directive, `from bigtableio import ReadFromBigtable`, to something different, and use the external tarball package named accordingly. Upon merge, using an extra package will no longer be necessary and the test should run as-is. This was confirmed by running a sequence of nearly identical tests back-to-back, the instructions to which I can provide separately. In an attempt to reduce the code changes to a bare minimum, the write part of the test has been discarded, as it would test an already accepted code anyway. Finally, I'd also suggest closing this PR and opening a new one, so to make things cleaner. Let me know if this is a viable option. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393839) Time Spent: 44h 20m (was: 44h 10m) > Create a Cloud Bigtable IO connector for Python > --- > > Key: BEAM-3342 > URL: https://issues.apache.org/jira/browse/BEAM-3342 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Solomon Duskis >Assignee: Solomon Duskis >Priority: Major > Time Spent: 44h 20m > Remaining Estimate: 0h > > I would like to create a Cloud Bigtable python connector. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393838=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393838 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 26/Feb/20 23:28 Start Date: 26/Feb/20 23:28 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#discussion_r384826746 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ## @@ -375,4 +422,361 @@ public void checkDone() throws IllegalStateException { } } } + + /** + * A splittable {@link DoFn} which executes an {@link UnboundedSource}. + * + * We model the element as the original source and the restriction as a pair of the sub-source + * and its {@link CheckpointMark}. This allows us to split the sub-source over and over as long as + * the checkpoint mark is {@code null} or the {@link NoopCheckpointMark} since it does not + * maintain any state. + */ + // TODO: Support reporting the watermark, currently the watermark never advances. Review comment: JIRA for tracking? (Or would this be 10897?) Maybe update the PR/commit description to describe the current capabilities and limitations. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393838) Time Spent: 20h 50m (was: 20h 40m) > Fn API SDF support > -- > > Key: BEAM-2939 > URL: https://issues.apache.org/jira/browse/BEAM-2939 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Henning Rohde >Assignee: Luke Cwik >Priority: Major > Labels: portability > Time Spent: 20h 50m > Remaining Estimate: 0h > > The Fn API should support streaming SDF. Detailed design TBD. > Once design is ready, expand subtasks similarly to BEAM-2822. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393837=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393837 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 26/Feb/20 23:28 Start Date: 26/Feb/20 23:28 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#discussion_r384793593 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ## @@ -375,4 +422,361 @@ public void checkDone() throws IllegalStateException { } } } + + /** + * A splittable {@link DoFn} which executes an {@link UnboundedSource}. + * + * We model the element as the original source and the restriction as a pair of the sub-source + * and its {@link CheckpointMark}. This allows us to split the sub-source over and over as long as + * the checkpoint mark is {@code null} or the {@link NoopCheckpointMark} since it does not + * maintain any state. + */ + // TODO: Support reporting the watermark, currently the watermark never advances. + @UnboundedPerElement + static class UnboundedSourceAsSDFWrapperFn + extends DoFn, ValueWithRecordId> { + +private static final int DEFAULT_DESIRED_NUM_SPLITS = 20; Review comment: It seems the risk is higher setting this too low than too high--maybe 100 or more? (Is there a JIRA for letting the runner pass this in?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393837) Time Spent: 20h 40m (was: 20.5h) > Fn API SDF support > -- > > Key: BEAM-2939 > URL: https://issues.apache.org/jira/browse/BEAM-2939 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Henning Rohde >Assignee: Luke Cwik >Priority: Major > Labels: portability > Time Spent: 20h 40m > Remaining Estimate: 0h > > The Fn API should support streaming SDF. Detailed design TBD. > Once design is ready, expand subtasks similarly to BEAM-2822. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8965) WriteToBigQuery failed in BundleBasedDirectRunner
[ https://issues.apache.org/jira/browse/BEAM-8965?focusedWorklogId=393828=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393828 ] ASF GitHub Bot logged work on BEAM-8965: Author: ASF GitHub Bot Created on: 26/Feb/20 23:01 Start Date: 26/Feb/20 23:01 Worklog Time Spent: 10m Work Description: bobingm commented on issue #10901: [BEAM-8965] Remove duplicate sideinputs in ConsumerTrackingPipelineVisitor URL: https://github.com/apache/beam/pull/10901#issuecomment-591688753 @pabloem the failure is not caused by this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393828) Time Spent: 2h (was: 1h 50m) > WriteToBigQuery failed in BundleBasedDirectRunner > - > > Key: BEAM-8965 > URL: https://issues.apache.org/jira/browse/BEAM-8965 > Project: Beam > Issue Type: Bug > Components: io-py-gcp >Affects Versions: 2.16.0, 2.17.0, 2.18.0, 2.19.0 >Reporter: Wenbing Bai >Assignee: Wenbing Bai >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > *{{WriteToBigQuery}}* fails in *{{BundleBasedDirectRunner}}* with error > {{PCollection of size 2 with more than one element accessed as a singleton > view.}} > Here is the code > > {code:python} > with Pipeline() as p: > query_results = ( > p > | beam.io.Read(beam.io.BigQuerySource( > query='SELECT ... FROM ...') > ) > query_results | beam.io.gcp.WriteToBigQuery( > table=, > method=WriteToBigQuery.Method.FILE_LOADS, > schema={"fields": []} > ) > {code} > > Here is the error > > {code:none} > File "apache_beam/runners/common.py", line 778, in > apache_beam.runners.common.DoFnRunner.process > def process(self, windowed_value): > File "apache_beam/runners/common.py", line 782, in > apache_beam.runners.common.DoFnRunner.process > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 849, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > raise_with_traceback(new_exn) > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > return self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > self._invoke_process_per_window( > File "apache_beam/runners/common.py", line 610, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > [si[global_window] for si in self.side_inputs])) > File > "/home/wbai/terra/terra_py2/local/lib/python2.7/site-packages/apache_beam/transforms/sideinputs.py", > line 65, in __getitem__ > _FilteringIterable(self._iterable, target_window), self._view_options) > File > "/home/wbai/terra/terra_py2/local/lib/python2.7/site-packages/apache_beam/pvalue.py", > line 443, in _from_runtime_iterable > len(head), str(head[0]), str(head[1]))) > ValueError: PCollection of size 2 with more than one element accessed as a > singleton view. First two elements encountered are > "gs://temp-dev/temp/bq_load/3edbf2172dd540edb5c8e9597206b10f", > "gs://temp-dev/temp/bq_load/3edbf2172dd540edb5c8e9597206b10f". [while running > 'WriteToBigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)'] > {code} > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-3188) [Calcite SQL] Query Parametrization
[ https://issues.apache.org/jira/browse/BEAM-3188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Weaver updated BEAM-3188: -- Summary: [Calcite SQL] Query Parametrization (was: [SQL] Query Parametrization) > [Calcite SQL] Query Parametrization > --- > > Key: BEAM-3188 > URL: https://issues.apache.org/jira/browse/BEAM-3188 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Kyle Weaver >Priority: Major > > Look into SQL query parametrization/templating. > Calcite supports parameters: > https://issues.apache.org/jira/browse/CALCITE-2054 > Beam does not: > {code:java} > Caused by: java.lang.UnsupportedOperationException: class > org.apache.calcite.rex.RexDynamicParam is not supported yet! > at > org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor.buildExpression(BeamSqlFnExecutor.java:430) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-8591) Exception is thrown while running Beam Pipeline on Kubernetes Flink Cluster.
[ https://issues.apache.org/jira/browse/BEAM-8591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Weaver resolved BEAM-8591. --- Fix Version/s: Not applicable Resolution: Not A Problem > Exception is thrown while running Beam Pipeline on Kubernetes Flink Cluster. > > > Key: BEAM-8591 > URL: https://issues.apache.org/jira/browse/BEAM-8591 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Mingliang Gong >Assignee: Kyle Weaver >Priority: Major > Fix For: Not applicable > > > h2. Setup Clusters > * Setup Local Flink Cluster: > [https://ci.apache.org/projects/flink/flink-docs-release-1.8/tutorials/local_setup.html] > * Setup Kubernetes Flink Cluster using Minikube: > [https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html] > h2. Verify Clusters > Execute command “./bin/flink run examples/streaming/WordCount.jar”. Both > Local and K8S Flink Cluster work fine. > h2. Using Apache Beam Flink Runner > Instruction: [https://beam.apache.org/documentation/runners/flink/] > Sample Pipeline Code: > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > options = PipelineOptions([ > "--runner=PortableRunner", > "--job_endpoint=localhost:8099", > "--environment_type=LOOPBACK" > ]) > with beam.Pipeline(options=options) as pipeline: > data = ["Sample data", > "Sample data - 0", > "Sample data - 1"] > raw_data = (pipeline > | 'CreateHardCodeData' >> beam.Create(data) > | 'Map' >> beam.Map(lambda line : line + '.') > | 'Print' >> beam.Map(print)){code} > Verify different environment_type in Python SDK Harness Configuration > *environment_type=LOOPBACK* > # Run pipeline on local cluster: Works Fine > # Run pipeline on K8S cluster, Exceptions are thrown: > java.lang.Exception: The user defined 'open()' method caused an exception: > org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: > UNAVAILABLE: io exception Caused by: > org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.AbstractChannel$AnnotatedConnectException: > Connection refused: localhost/127.0.0.1:51017 > *environment_type=DOCKER* > # Run pipeline on local cluster: Work fine > # Run pipeline on K8S cluster, Exception are thrown: > Caused by: java.io.IOException: Cannot run program "docker": error=2, No > such file or directory. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (BEAM-8460) Portable Flink runner fails UsesStrictTimerOrdering category tests
[ https://issues.apache.org/jira/browse/BEAM-8460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Weaver reopened BEAM-8460: --- > Portable Flink runner fails UsesStrictTimerOrdering category tests > -- > > Key: BEAM-8460 > URL: https://issues.apache.org/jira/browse/BEAM-8460 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.17.0 >Reporter: Jan Lukavský >Assignee: Kyle Weaver >Priority: Major > Fix For: Not applicable > > Time Spent: 0.5h > Remaining Estimate: 0h > > BEAM-7520 introduced new set of validatesRunner tests that test that timers > are fired exactly in order of increasing timestamp. Portable Flink runner > fails these added tests (are currently ignored). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-8460) Portable Flink runner ignores UsesStrictTimerOrdering category tests
[ https://issues.apache.org/jira/browse/BEAM-8460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Weaver updated BEAM-8460: -- Summary: Portable Flink runner ignores UsesStrictTimerOrdering category tests (was: Portable Flink runner fails UsesStrictTimerOrdering category tests) > Portable Flink runner ignores UsesStrictTimerOrdering category tests > > > Key: BEAM-8460 > URL: https://issues.apache.org/jira/browse/BEAM-8460 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.17.0 >Reporter: Jan Lukavský >Assignee: Kyle Weaver >Priority: Major > Fix For: Not applicable > > Time Spent: 0.5h > Remaining Estimate: 0h > > BEAM-7520 introduced new set of validatesRunner tests that test that timers > are fired exactly in order of increasing timestamp. Portable Flink runner > fails these added tests (are currently ignored). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-8460) Portable Flink runner fails UsesStrictTimerOrdering category tests
[ https://issues.apache.org/jira/browse/BEAM-8460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Weaver resolved BEAM-8460. --- Fix Version/s: Not applicable Resolution: Fixed > Portable Flink runner fails UsesStrictTimerOrdering category tests > -- > > Key: BEAM-8460 > URL: https://issues.apache.org/jira/browse/BEAM-8460 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.17.0 >Reporter: Jan Lukavský >Assignee: Kyle Weaver >Priority: Major > Fix For: Not applicable > > Time Spent: 0.5h > Remaining Estimate: 0h > > BEAM-7520 introduced new set of validatesRunner tests that test that timers > are fired exactly in order of increasing timestamp. Portable Flink runner > fails these added tests (are currently ignored). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-9088) "Container environments" out of date
[ https://issues.apache.org/jira/browse/BEAM-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Weaver resolved BEAM-9088. --- Fix Version/s: Not applicable Resolution: Fixed > "Container environments" out of date > > > Key: BEAM-9088 > URL: https://issues.apache.org/jira/browse/BEAM-9088 > Project: Beam > Issue Type: Improvement > Components: website >Reporter: Kyle Weaver >Assignee: Kyle Weaver >Priority: Major > Fix For: Not applicable > > > https://beam.apache.org/documentation/runtime/environments/ > States that the default tag is `latest`, but that is no longer true. (Default > tag is 2.19.0.dev etc.) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7746) Add type hints to python code
[ https://issues.apache.org/jira/browse/BEAM-7746?focusedWorklogId=393825=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393825 ] ASF GitHub Bot logged work on BEAM-7746: Author: ASF GitHub Bot Created on: 26/Feb/20 22:52 Start Date: 26/Feb/20 22:52 Worklog Time Spent: 10m Work Description: chadrik commented on issue #10822: [BEAM-7746] Minor typing updates / fixes URL: https://github.com/apache/beam/pull/10822#issuecomment-591685577 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393825) Time Spent: 67h 10m (was: 67h) > Add type hints to python code > - > > Key: BEAM-7746 > URL: https://issues.apache.org/jira/browse/BEAM-7746 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chad Dombrova >Assignee: Chad Dombrova >Priority: Major > Time Spent: 67h 10m > Remaining Estimate: 0h > > As a developer of the beam source code, I would like the code to use pep484 > type hints so that I can clearly see what types are required, get completion > in my IDE, and enforce code correctness via a static analyzer like mypy. > This may be considered a precursor to BEAM-7060 > Work has been started here: [https://github.com/apache/beam/pull/9056] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8965) WriteToBigQuery failed in BundleBasedDirectRunner
[ https://issues.apache.org/jira/browse/BEAM-8965?focusedWorklogId=393812=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393812 ] ASF GitHub Bot logged work on BEAM-8965: Author: ASF GitHub Bot Created on: 26/Feb/20 22:36 Start Date: 26/Feb/20 22:36 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10901: [BEAM-8965] Remove duplicate sideinputs in ConsumerTrackingPipelineVisitor URL: https://github.com/apache/beam/pull/10901#issuecomment-591680118 thanks @bobingm ! Looks good to me. Can you fix the formatting issues, and I'll merge? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393812) Time Spent: 1h 50m (was: 1h 40m) > WriteToBigQuery failed in BundleBasedDirectRunner > - > > Key: BEAM-8965 > URL: https://issues.apache.org/jira/browse/BEAM-8965 > Project: Beam > Issue Type: Bug > Components: io-py-gcp >Affects Versions: 2.16.0, 2.17.0, 2.18.0, 2.19.0 >Reporter: Wenbing Bai >Assignee: Wenbing Bai >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > *{{WriteToBigQuery}}* fails in *{{BundleBasedDirectRunner}}* with error > {{PCollection of size 2 with more than one element accessed as a singleton > view.}} > Here is the code > > {code:python} > with Pipeline() as p: > query_results = ( > p > | beam.io.Read(beam.io.BigQuerySource( > query='SELECT ... FROM ...') > ) > query_results | beam.io.gcp.WriteToBigQuery( > table=, > method=WriteToBigQuery.Method.FILE_LOADS, > schema={"fields": []} > ) > {code} > > Here is the error > > {code:none} > File "apache_beam/runners/common.py", line 778, in > apache_beam.runners.common.DoFnRunner.process > def process(self, windowed_value): > File "apache_beam/runners/common.py", line 782, in > apache_beam.runners.common.DoFnRunner.process > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 849, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > raise_with_traceback(new_exn) > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > return self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > self._invoke_process_per_window( > File "apache_beam/runners/common.py", line 610, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > [si[global_window] for si in self.side_inputs])) > File > "/home/wbai/terra/terra_py2/local/lib/python2.7/site-packages/apache_beam/transforms/sideinputs.py", > line 65, in __getitem__ > _FilteringIterable(self._iterable, target_window), self._view_options) > File > "/home/wbai/terra/terra_py2/local/lib/python2.7/site-packages/apache_beam/pvalue.py", > line 443, in _from_runtime_iterable > len(head), str(head[0]), str(head[1]))) > ValueError: PCollection of size 2 with more than one element accessed as a > singleton view. First two elements encountered are > "gs://temp-dev/temp/bq_load/3edbf2172dd540edb5c8e9597206b10f", > "gs://temp-dev/temp/bq_load/3edbf2172dd540edb5c8e9597206b10f". [while running > 'WriteToBigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)'] > {code} > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9322) Python SDK ignores manually set PCollection tags
[ https://issues.apache.org/jira/browse/BEAM-9322?focusedWorklogId=393813=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393813 ] ASF GitHub Bot logged work on BEAM-9322: Author: ASF GitHub Bot Created on: 26/Feb/20 22:36 Start Date: 26/Feb/20 22:36 Worklog Time Spent: 10m Work Description: aaltay commented on pull request #10934: [BEAM-9322] [BEAM-1833] Broke some people, setting the default to have the experiment be disabled URL: https://github.com/apache/beam/pull/10934 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393813) Time Spent: 1h 40m (was: 1.5h) > Python SDK ignores manually set PCollection tags > > > Key: BEAM-9322 > URL: https://issues.apache.org/jira/browse/BEAM-9322 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.20.0 > > Time Spent: 1h 40m > Remaining Estimate: 0h > > The Python SDK currently ignores any tags set on PCollections manually when > applying PTransforms when adding the PCollection to the PTransform > [outputs|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L595]]. > In the > [add_output|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L872]] > method, the tag is set to None for all PValues, meaning the output tags are > set to an enumeration index over the PCollection outputs. The tags are not > propagated to correctly which can be a problem on relying on the output > PCollection tags to match the user set values. > The fix is to correct BEAM-1833, and always pass in the tags. However, that > doesn't fix the problem for nested PCollections. If you have a dict of lists > of PCollections, what should their tags be correctly set to? In order to fix > this, first propagate the correct tag then talk with the community about the > best auto-generated tags. > Some users may rely on the old implementation, so a flag will be created: > "force_generated_pcollection_output_ids" and be default set to False. If > True, this will go to the old implementation and generate tags for > PCollections. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9322) Python SDK ignores manually set PCollection tags
[ https://issues.apache.org/jira/browse/BEAM-9322?focusedWorklogId=393811=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393811 ] ASF GitHub Bot logged work on BEAM-9322: Author: ASF GitHub Bot Created on: 26/Feb/20 22:34 Start Date: 26/Feb/20 22:34 Worklog Time Spent: 10m Work Description: aaltay commented on pull request #10934: [BEAM-9322] [BEAM-1833] Broke some people, setting the default to have the experiment be disabled URL: https://github.com/apache/beam/pull/10934#discussion_r384809280 ## File path: sdks/python/apache_beam/pipeline.py ## @@ -620,23 +620,25 @@ def apply(self, transform, pvalueish=None, label=None): current.add_output(result, result._main_tag) continue + # TODO(BEAM-9322): Remove the experiment check and have this conditional + # be the default. + if self._options.view_as(DebugOptions).lookup_experiment( + 'passthrough_pcollection_output_ids', default=False): +# Otherwise default to the new implementation which only auto-generates Review comment: I think this comment could be improved, because this is not the otherwise case anymore. Follow up PR would be fine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393811) Time Spent: 1.5h (was: 1h 20m) > Python SDK ignores manually set PCollection tags > > > Key: BEAM-9322 > URL: https://issues.apache.org/jira/browse/BEAM-9322 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.20.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > The Python SDK currently ignores any tags set on PCollections manually when > applying PTransforms when adding the PCollection to the PTransform > [outputs|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L595]]. > In the > [add_output|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L872]] > method, the tag is set to None for all PValues, meaning the output tags are > set to an enumeration index over the PCollection outputs. The tags are not > propagated to correctly which can be a problem on relying on the output > PCollection tags to match the user set values. > The fix is to correct BEAM-1833, and always pass in the tags. However, that > doesn't fix the problem for nested PCollections. If you have a dict of lists > of PCollections, what should their tags be correctly set to? In order to fix > this, first propagate the correct tag then talk with the community about the > best auto-generated tags. > Some users may rely on the old implementation, so a flag will be created: > "force_generated_pcollection_output_ids" and be default set to False. If > True, this will go to the old implementation and generate tags for > PCollections. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7746) Add type hints to python code
[ https://issues.apache.org/jira/browse/BEAM-7746?focusedWorklogId=393810=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393810 ] ASF GitHub Bot logged work on BEAM-7746: Author: ASF GitHub Bot Created on: 26/Feb/20 22:33 Start Date: 26/Feb/20 22:33 Worklog Time Spent: 10m Work Description: chadrik commented on issue #10822: [BEAM-7746] Minor typing updates / fixes URL: https://github.com/apache/beam/pull/10822#issuecomment-591610348 Down to 79 errors! Btw, mypy may have revealed some legitimate errors in a recent change: ``` apache_beam/runners/worker/operations.py:837: error: "float" not callable [operator] apache_beam/runners/worker/operations.py:838: error: "float" not callable [operator] apache_beam/runners/worker/operations.py:839: error: "float" not callable [operator] apache_beam/runners/worker/operations.py:841: error: "float" not callable [operator] apache_beam/runners/worker/operations.py:842: error: "float" not callable [operator] apache_beam/runners/worker/operations.py:846: error: Module has no attribute "LATEST_DOUBLES_URN"; maybe "LATEST_DOUBLES_TYPE"? [attr-defined] apache_beam/runners/worker/operations.py:852: error: Module has no attribute "LATEST_DOUBLES_URN"; maybe "LATEST_DOUBLES_TYPE"? [attr-defined] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393810) Time Spent: 67h (was: 66h 50m) > Add type hints to python code > - > > Key: BEAM-7746 > URL: https://issues.apache.org/jira/browse/BEAM-7746 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chad Dombrova >Assignee: Chad Dombrova >Priority: Major > Time Spent: 67h > Remaining Estimate: 0h > > As a developer of the beam source code, I would like the code to use pep484 > type hints so that I can clearly see what types are required, get completion > in my IDE, and enforce code correctness via a static analyzer like mypy. > This may be considered a precursor to BEAM-7060 > Work has been started here: [https://github.com/apache/beam/pull/9056] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8989) Backwards incompatible change in ParDo.getSideInputs (caught by failure when running Apache Nemo quickstart)
[ https://issues.apache.org/jira/browse/BEAM-8989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17045965#comment-17045965 ] Rui Wang commented on BEAM-8989: Ok. If [~wonook] agrees, I can do 2) per [~iemejia]'s suggestion and Nemo side should be updated. In this case this Jira won't block 2.20.0 release. > Backwards incompatible change in ParDo.getSideInputs (caught by failure when > running Apache Nemo quickstart) > > > Key: BEAM-8989 > URL: https://issues.apache.org/jira/browse/BEAM-8989 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.16.0, 2.17.0, 2.18.0, 2.19.0 >Reporter: Luke Cwik >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > > [PR/9275|https://github.com/apache/beam/pull/9275] changed > *ParDo.getSideInputs* from *List* to *Map PCollectionView>* which is backwards incompatible change and was released as > part of Beam 2.16.0 erroneously. > Running the Apache Nemo Quickstart fails with: > > {code:java} > Exception in thread "main" java.lang.RuntimeException: Translator private > static void > org.apache.nemo.compiler.frontend.beam.PipelineTranslator.parDoMultiOutputTranslator(org.apache.nemo.compiler.frontend.beam.PipelineTranslationContext,org.apache.beam.sdk.runners.TransformHierarchy$Node,org.apache.beam.sdk.transforms.ParDo$MultiOutput) > have failed to translate > org.apache.beam.examples.WordCount$ExtractWordsFn@600b9d27Exception in thread > "main" java.lang.RuntimeException: Translator private static void > org.apache.nemo.compiler.frontend.beam.PipelineTranslator.parDoMultiOutputTranslator(org.apache.nemo.compiler.frontend.beam.PipelineTranslationContext,org.apache.beam.sdk.runners.TransformHierarchy$Node,org.apache.beam.sdk.transforms.ParDo$MultiOutput) > have failed to translate > org.apache.beam.examples.WordCount$ExtractWordsFn@600b9d27 at > org.apache.nemo.compiler.frontend.beam.PipelineTranslator.translatePrimitive(PipelineTranslator.java:113) > at > org.apache.nemo.compiler.frontend.beam.PipelineVisitor.visitPrimitiveTransform(PipelineVisitor.java:46) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460) at > org.apache.nemo.compiler.frontend.beam.NemoRunner.run(NemoRunner.java:80) at > org.apache.nemo.compiler.frontend.beam.NemoRunner.run(NemoRunner.java:31) at > org.apache.beam.sdk.Pipeline.run(Pipeline.java:315) at > org.apache.beam.sdk.Pipeline.run(Pipeline.java:301) at > org.apache.beam.examples.WordCount.runWordCount(WordCount.java:185) at > org.apache.beam.examples.WordCount.main(WordCount.java:192)Caused by: > java.lang.reflect.InvocationTargetException at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.nemo.compiler.frontend.beam.PipelineTranslator.translatePrimitive(PipelineTranslator.java:109) > ... 14 moreCaused by: java.lang.NoSuchMethodError: > org.apache.beam.sdk.transforms.ParDo$MultiOutput.getSideInputs()Ljava/util/List; > at > org.apache.nemo.compiler.frontend.beam.PipelineTranslator.parDoMultiOutputTranslator(PipelineTranslator.java:236) > ... 19 more{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)