[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=355385=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355385 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 06/Dec/19 18:50 Start Date: 06/Dec/19 18:50 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #10273: [BEAM-8427] Add MongoDB to SQL documentation URL: https://github.com/apache/beam/pull/10273 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 355385) Time Spent: 9h 20m (was: 9h 10m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Fix For: 2.18.0 > > Time Spent: 9h 20m > Remaining Estimate: 0h > > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=354850=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354850 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 06/Dec/19 01:18 Start Date: 06/Dec/19 01:18 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #10273: [BEAM-8427] Add MongoDB to SQL documentation URL: https://github.com/apache/beam/pull/10273#discussion_r354627008 ## File path: website/src/documentation/dsls/sql/extensions/create-external-table.md ## @@ -308,6 +308,43 @@ Write Mode supports writing to a topic. Only simple types are supported. +## MongoDB + +### Syntax + +``` +CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) +TYPE mongodb +LOCATION 'mongodb://[HOST]:[PORT]/[DATABASE]/[COLLECTION]' +``` +* `LOCATION`: Location of the collection. +* `HOST`: Location of the MongoDB server. Can be localhost or an ip address. + When authentication is required username and password can be specified + as follows: `username:password@localhost`. +* `PORT`: Port on which MongoDB server is listening. +* `DATABASE`: Database to connect to. +* `COLLECTION`: Collection within the database. + +### Read Mode + +Read Mode supports reading from a collection. + +### Write Mode + +Write Mode supports writing to a collection. + +### Schema + +Only simple types are supported. MongoDB documents are mapped to Beam SQL types via `JsonToRow` transform. Review comment: Actually it looks like there's quite a lot of precedent within `website/src` of using `{{site.release_latest}}` for this purpose (some other places use `current` instead of a version number which I didn't realize was an option. @11moon11 could you link to JsonToRow using that? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 354850) Time Spent: 9h (was: 8h 50m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Fix For: 2.18.0 > > Time Spent: 9h > Remaining Estimate: 0h > > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=354851=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354851 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 06/Dec/19 01:18 Start Date: 06/Dec/19 01:18 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #10273: [BEAM-8427] Add MongoDB to SQL documentation URL: https://github.com/apache/beam/pull/10273#discussion_r354627008 ## File path: website/src/documentation/dsls/sql/extensions/create-external-table.md ## @@ -308,6 +308,43 @@ Write Mode supports writing to a topic. Only simple types are supported. +## MongoDB + +### Syntax + +``` +CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) +TYPE mongodb +LOCATION 'mongodb://[HOST]:[PORT]/[DATABASE]/[COLLECTION]' +``` +* `LOCATION`: Location of the collection. +* `HOST`: Location of the MongoDB server. Can be localhost or an ip address. + When authentication is required username and password can be specified + as follows: `username:password@localhost`. +* `PORT`: Port on which MongoDB server is listening. +* `DATABASE`: Database to connect to. +* `COLLECTION`: Collection within the database. + +### Read Mode + +Read Mode supports reading from a collection. + +### Write Mode + +Write Mode supports writing to a collection. + +### Schema + +Only simple types are supported. MongoDB documents are mapped to Beam SQL types via `JsonToRow` transform. Review comment: Actually it looks like there's quite a lot of precedent within `website/src` of using `{{site.release_latest}}` for this purpose (some other places use `current` instead of a version number which I didn't realize was an option). @11moon11 could you link to JsonToRow using that? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 354851) Time Spent: 9h 10m (was: 9h) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Fix For: 2.18.0 > > Time Spent: 9h 10m > Remaining Estimate: 0h > > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=354766=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354766 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 05/Dec/19 23:12 Start Date: 05/Dec/19 23:12 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on issue #10273: [BEAM-8427] Add MongoDB to SQL documentation URL: https://github.com/apache/beam/pull/10273#issuecomment-562360163 Run Website PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 354766) Time Spent: 8h 50m (was: 8h 40m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Fix For: 2.18.0 > > Time Spent: 8h 50m > Remaining Estimate: 0h > > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=354700=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354700 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 05/Dec/19 21:23 Start Date: 05/Dec/19 21:23 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on issue #10273: [BEAM-8427] Add MongoDB to SQL documentation URL: https://github.com/apache/beam/pull/10273#issuecomment-562324387 Run Website PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 354700) Time Spent: 8h 40m (was: 8.5h) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Fix For: 2.18.0 > > Time Spent: 8h 40m > Remaining Estimate: 0h > > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=354670=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354670 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 05/Dec/19 20:56 Start Date: 05/Dec/19 20:56 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #10273: [BEAM-8427] Add MongoDB to SQL documentation URL: https://github.com/apache/beam/pull/10273#discussion_r354533080 ## File path: website/src/documentation/dsls/sql/extensions/create-external-table.md ## @@ -308,6 +308,43 @@ Write Mode supports writing to a topic. Only simple types are supported. +## MongoDB + +### Syntax + +``` +CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) +TYPE mongodb +LOCATION 'mongodb://[HOST]:[PORT]/[DATABASE]/[COLLECTION]' +``` +* `LOCATION`: Location of the collection. +* `HOST`: Location of the MongoDB server. Can be localhost or an ip address. + When authentication is required username and password can be specified + as follows: `username:password@localhost`. +* `PORT`: Port on which MongoDB server is listening. +* `DATABASE`: Database to connect to. +* `COLLECTION`: Collection within the database. + +### Read Mode + +Read Mode supports reading from a collection. + +### Write Mode + +Write Mode supports writing to a collection. + +### Schema + +Only simple types are supported. MongoDB documents are mapped to Beam SQL types via `JsonToRow` transform. Review comment: Maybe consider adding a link to the [`JsonToRow` javadoc](https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/transforms/JsonToRow.html). I'm not sure how to handle the version number in that link though. It looks like [in the past](https://github.com/apache/beam/commit/860f5c2439b0e5f73a7a65b714a766fb504f227f#diff-b97eb3d3c732a00577269bc09cd61297L37) we used `{{site.release_latest}}` to keep it up to date. But that also carries the risk that the transform will change name and the link will break. @apilloud or @kennknowles do you have an opinion on this? I suppose ideally our static site generator would generate this link and simultaneously validate that JsonToRow (or at least JsonToRow.html) exists. After a quick search I don't see any way to do this with jekyll but maybe I missed something. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 354670) Time Spent: 8.5h (was: 8h 20m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Fix For: 2.18.0 > > Time Spent: 8.5h > Remaining Estimate: 0h > > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=353013=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-353013 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 03/Dec/19 21:48 Start Date: 03/Dec/19 21:48 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #10273: [BEAM-8427] Add MongoDB to SQL documentation URL: https://github.com/apache/beam/pull/10273 Now, that MongoDbTable exists it should be documented on the SQL page. R: @TheNeuralBit CC: @apilloud CC: @amaliujia Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=343009=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-343009 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 13/Nov/19 23:39 Start Date: 13/Nov/19 23:39 Worklog Time Spent: 10m Work Description: apilloud commented on pull request #9892: [BEAM-8427] [SQL] buildIOWrite for MongoDb Table URL: https://github.com/apache/beam/pull/9892 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 343009) Time Spent: 8h 10m (was: 8h) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 8h 10m > Remaining Estimate: 0h > > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=343008=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-343008 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 13/Nov/19 23:39 Start Date: 13/Nov/19 23:39 Worklog Time Spent: 10m Work Description: apilloud commented on issue #9892: [BEAM-8427] [SQL] buildIOWrite for MongoDb Table URL: https://github.com/apache/beam/pull/9892#issuecomment-553655631 I'm not quite sure this will work for DATE and TIME, but TIMESTAMP isn't supported so LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 343008) Time Spent: 8h (was: 7h 50m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 8h > Remaining Estimate: 0h > > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=341548=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-341548 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 11/Nov/19 23:55 Start Date: 11/Nov/19 23:55 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #9892: [BEAM-8427] [SQL] buildIOWrite for MongoDb Table URL: https://github.com/apache/beam/pull/9892#issuecomment-552671228 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 341548) Time Spent: 7h 50m (was: 7h 40m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 7h 50m > Remaining Estimate: 0h > > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=341547=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-341547 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 11/Nov/19 23:55 Start Date: 11/Nov/19 23:55 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #9892: [BEAM-8427] [SQL] buildIOWrite for MongoDb Table URL: https://github.com/apache/beam/pull/9892#issuecomment-552671228 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 341547) Time Spent: 7h 40m (was: 7.5h) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 7h 40m > Remaining Estimate: 0h > > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=341522=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-341522 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 11/Nov/19 22:29 Start Date: 11/Nov/19 22:29 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #9892: [BEAM-8427] [SQL] buildIOWrite for MongoDb Table URL: https://github.com/apache/beam/pull/9892#issuecomment-552646139 Run sql postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 341522) Time Spent: 7.5h (was: 7h 20m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 7.5h > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=341501=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-341501 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 11/Nov/19 21:53 Start Date: 11/Nov/19 21:53 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #9892: [BEAM-8427] [SQL] buildIOWrite for MongoDb Table URL: https://github.com/apache/beam/pull/9892#issuecomment-552633435 Run sql postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 341501) Time Spent: 7h 10m (was: 7h) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 7h 10m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=341500=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-341500 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 11/Nov/19 21:53 Start Date: 11/Nov/19 21:53 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #9892: [BEAM-8427] [SQL] buildIOWrite for MongoDb Table URL: https://github.com/apache/beam/pull/9892#issuecomment-552633435 Run sql postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 341500) Time Spent: 7h (was: 6h 50m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 7h > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=341502=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-341502 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 11/Nov/19 21:53 Start Date: 11/Nov/19 21:53 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #9892: [BEAM-8427] [SQL] buildIOWrite for MongoDb Table URL: https://github.com/apache/beam/pull/9892#issuecomment-552037374 Waiting on #10031 to get merged before merging this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 341502) Time Spent: 7h 20m (was: 7h 10m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 7h 20m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=341486=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-341486 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 11/Nov/19 21:31 Start Date: 11/Nov/19 21:31 Worklog Time Spent: 10m Work Description: apilloud commented on pull request #10031: [BEAM-8427] [SQL] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/10031 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 341486) Time Spent: 6h 50m (was: 6h 40m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 6h 50m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=340828=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340828 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 09/Nov/19 00:02 Start Date: 09/Nov/19 00:02 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #9892: [BEAM-8427] [SQL] buildIOWrite for MongoDb Table URL: https://github.com/apache/beam/pull/9892#issuecomment-552037374 Waiting on #10031 to get merged before merging this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 340828) Time Spent: 6h 40m (was: 6.5h) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 6h 40m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=340827=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340827 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 09/Nov/19 00:01 Start Date: 09/Nov/19 00:01 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9892: [BEAM-8427] [SQL] buildIOWrite for MongoDb Table URL: https://github.com/apache/beam/pull/9892#discussion_r344410953 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java ## @@ -351,6 +351,9 @@ private void writeValue(JsonGenerator gen, FieldType type, Object value) throws case ROW: writeRow((Row) value, type.getRowSchema(), gen); break; +case LOGICAL_TYPE: + writeValue(gen, type.getLogicalType().getBaseType(), value); + break; Review comment: Added `makeLogicalTypeTestCase`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 340827) Time Spent: 6.5h (was: 6h 20m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 6.5h > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=340826=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340826 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 09/Nov/19 00:01 Start Date: 09/Nov/19 00:01 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9892: [BEAM-8427] [SQL] buildIOWrite for MongoDb Table URL: https://github.com/apache/beam/pull/9892#discussion_r344410918 ## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTableTest.java ## @@ -99,6 +100,35 @@ public void testDocumentToRowConverter() { pipeline.run().waitUntilFinish(); } + @Test + public void testRowToDocumentConverter() { +PCollection output = +pipeline +.apply( +"Create a row", +Create.of( +row( +SCHEMA, +9223372036854775807L, +2147483647, +(short) 32767, +(byte) 127, +true, +1.0, +(float) 1.0, +"string", +row( +Schema.builder().addNullableField("int32", INT32).build(), +2147483645), Review comment: Instead of adding a new field, replaced an existing `string` field with a logical wrapper around the String type from CalciteUtils. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 340826) Time Spent: 6h 20m (was: 6h 10m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 6h 20m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=340769=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340769 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 08/Nov/19 22:04 Start Date: 08/Nov/19 22:04 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #9892: [BEAM-8427] [SQL] buildIOWrite for MongoDb Table URL: https://github.com/apache/beam/pull/9892#discussion_r344384368 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java ## @@ -351,6 +351,9 @@ private void writeValue(JsonGenerator gen, FieldType type, Object value) throws case ROW: writeRow((Row) value, type.getRowSchema(), gen); break; +case LOGICAL_TYPE: + writeValue(gen, type.getLogicalType().getBaseType(), value); + break; Review comment: Nice! For some reason I was thinking this would be a big pain, but this is nice and simple :+1: Could you add some tests of logical types to `RowJsonTest`? You could just use `LogicalTypes.FixedSizeBytes` I think. Probably the easiest way is to add a `makeLogicalTypeTestCase` here: https://github.com/apache/beam/blob/35da90a94953597e9e5676e1e1e70f27d2a8f064/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java#L68-L73 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 340769) Time Spent: 6h 10m (was: 6h) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=340770=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340770 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 08/Nov/19 22:04 Start Date: 08/Nov/19 22:04 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #9892: [BEAM-8427] [SQL] buildIOWrite for MongoDb Table URL: https://github.com/apache/beam/pull/9892#discussion_r344383786 ## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTableTest.java ## @@ -99,6 +100,35 @@ public void testDocumentToRowConverter() { pipeline.run().waitUntilFinish(); } + @Test + public void testRowToDocumentConverter() { +PCollection output = +pipeline +.apply( +"Create a row", +Create.of( +row( +SCHEMA, +9223372036854775807L, +2147483647, +(short) 32767, +(byte) 127, +true, +1.0, +(float) 1.0, +"string", +row( +Schema.builder().addNullableField("int32", INT32).build(), +2147483645), Review comment: Would be good to test logical types here (and in testDocumentToRowConverter) too This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 340770) Time Spent: 6h 10m (was: 6h) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=340267=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340267 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 08/Nov/19 01:10 Start Date: 08/Nov/19 01:10 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #10031: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/10031#issuecomment-551338685 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 340267) Time Spent: 6h (was: 5h 50m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 6h > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=340266=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340266 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 08/Nov/19 01:10 Start Date: 08/Nov/19 01:10 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #10031: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/10031#issuecomment-551338685 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 340266) Time Spent: 5h 50m (was: 5h 40m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 5h 50m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=340183=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340183 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 07/Nov/19 22:15 Start Date: 07/Nov/19 22:15 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #10031: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/10031#issuecomment-551291329 Run sql postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 340183) Time Spent: 5h 40m (was: 5.5h) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 5h 40m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=340182=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340182 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 07/Nov/19 22:15 Start Date: 07/Nov/19 22:15 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #10031: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/10031#issuecomment-551291329 Run sql postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 340182) Time Spent: 5.5h (was: 5h 20m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 5.5h > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=340178=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340178 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 07/Nov/19 22:03 Start Date: 07/Nov/19 22:03 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #10031: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/10031#issuecomment-551287100 Run sql postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 340178) Time Spent: 5h 20m (was: 5h 10m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 5h 20m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=340177=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340177 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 07/Nov/19 22:03 Start Date: 07/Nov/19 22:03 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #10031: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/10031#issuecomment-551287100 Run sql postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 340177) Time Spent: 5h 10m (was: 5h) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 5h 10m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=340172=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340172 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 07/Nov/19 21:41 Start Date: 07/Nov/19 21:41 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #10031: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/10031#issuecomment-551278228 Run sql postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 340172) Time Spent: 5h (was: 4h 50m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 5h > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=340171=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340171 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 07/Nov/19 21:41 Start Date: 07/Nov/19 21:41 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #10031: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/10031#issuecomment-551278228 Run sql postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 340171) Time Spent: 4h 50m (was: 4h 40m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 4h 50m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=340167=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340167 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 07/Nov/19 21:37 Start Date: 07/Nov/19 21:37 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #10031: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/10031 - Create a Table with read support (for now, write will be added in a separate PR) - Add some tests Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=339621=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339621 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 06/Nov/19 22:37 Start Date: 06/Nov/19 22:37 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #10018: Revert "[BEAM-8427] Create a table and a table provider for MongoDB" URL: https://github.com/apache/beam/pull/10018 Reverts apache/beam#9806 Post Commit tests break. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 339621) Time Spent: 4.5h (was: 4h 20m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=339559=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339559 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 06/Nov/19 19:25 Start Date: 06/Nov/19 19:25 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #9892: [BEAM-8427] [SQL] buildIOWrite from MongoDb Table URL: https://github.com/apache/beam/pull/9892#issuecomment-550462776 R: @TheNeuralBit cc: @apilloud cc: @amaliujia This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 339559) Time Spent: 4h 20m (was: 4h 10m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=338947=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338947 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 05/Nov/19 18:59 Start Date: 05/Nov/19 18:59 Worklog Time Spent: 10m Work Description: apilloud commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 338947) Time Spent: 4h 10m (was: 4h) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 4h 10m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=338946=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338946 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 05/Nov/19 18:59 Start Date: 05/Nov/19 18:59 Worklog Time Spent: 10m Work Description: apilloud commented on issue #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#issuecomment-549968943 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 338946) Time Spent: 4h (was: 3h 50m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=337620=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-337620 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 01/Nov/19 23:52 Start Date: 01/Nov/19 23:52 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9892: [BEAM-8427] [SQL] buildIOWrite from MongoDb Table URL: https://github.com/apache/beam/pull/9892#discussion_r341785685 ## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; +import static org.apache.beam.sdk.schemas.Schema.FieldType.BYTE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT16; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64; +import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING; +import static org.junit.Assert.assertEquals; + +import com.mongodb.MongoClient; +import java.util.Arrays; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; +import org.apache.beam.sdk.io.mongodb.MongoDBIOIT.MongoDBPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * A test of {@link org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbTable} on an + * independent Mongo instance. + * + * This test requires a running instance of MongoDB. Pass in connection information using + * PipelineOptions: + * + * + * ./gradlew integrationTest -p sdks/java/extensions/sql/integrationTest -DintegrationTestPipelineOptions='[ + * "--mongoDBHostName=1.2.3.4", + * "--mongoDBPort=27017", + * "--mongoDBDatabaseName=mypass", + * "--numberOfRecords=1000" ]' + * --tests org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbReadWriteIT + * -DintegrationTestRunner=direct + * + * + * A database, specified in the pipeline options, will be created implicitly if it does not exist + * already. And dropped upon completing tests. + * + * Please see 'build_rules.gradle' file for instructions regarding running this test using Beam + * performance testing framework. + */ +@RunWith(JUnit4.class) +public class MongoDbReadWriteIT { + private static final Schema SOURCE_SCHEMA = + Schema.builder() + .addNullableField("_id", STRING) + .addNullableField("c_bigint", INT64) + .addNullableField("c_tinyint", BYTE) + .addNullableField("c_smallint", INT16) + .addNullableField("c_integer", INT32) + .addNullableField("c_float", FLOAT) + .addNullableField("c_double", DOUBLE) + .addNullableField("c_boolean", BOOLEAN) + .addNullableField("c_varchar", STRING) + .addNullableField("c_arr", FieldType.array(STRING)) + .build(); + private static final String collection = "collection"; + private static MongoDBPipelineOptions options; + + @Rule public final TestPipeline writePipeline = TestPipeline.create(); + @Rule public final TestPipeline readPipeline = TestPipeline.create(); + + @BeforeClass + public static void setUp() throws Exception { +
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=337621=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-337621 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 01/Nov/19 23:52 Start Date: 01/Nov/19 23:52 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9892: [BEAM-8427] [SQL] buildIOWrite from MongoDb Table URL: https://github.com/apache/beam/pull/9892#discussion_r341785685 ## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; +import static org.apache.beam.sdk.schemas.Schema.FieldType.BYTE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT16; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64; +import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING; +import static org.junit.Assert.assertEquals; + +import com.mongodb.MongoClient; +import java.util.Arrays; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; +import org.apache.beam.sdk.io.mongodb.MongoDBIOIT.MongoDBPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * A test of {@link org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbTable} on an + * independent Mongo instance. + * + * This test requires a running instance of MongoDB. Pass in connection information using + * PipelineOptions: + * + * + * ./gradlew integrationTest -p sdks/java/extensions/sql/integrationTest -DintegrationTestPipelineOptions='[ + * "--mongoDBHostName=1.2.3.4", + * "--mongoDBPort=27017", + * "--mongoDBDatabaseName=mypass", + * "--numberOfRecords=1000" ]' + * --tests org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbReadWriteIT + * -DintegrationTestRunner=direct + * + * + * A database, specified in the pipeline options, will be created implicitly if it does not exist + * already. And dropped upon completing tests. + * + * Please see 'build_rules.gradle' file for instructions regarding running this test using Beam + * performance testing framework. + */ +@RunWith(JUnit4.class) +public class MongoDbReadWriteIT { + private static final Schema SOURCE_SCHEMA = + Schema.builder() + .addNullableField("_id", STRING) + .addNullableField("c_bigint", INT64) + .addNullableField("c_tinyint", BYTE) + .addNullableField("c_smallint", INT16) + .addNullableField("c_integer", INT32) + .addNullableField("c_float", FLOAT) + .addNullableField("c_double", DOUBLE) + .addNullableField("c_boolean", BOOLEAN) + .addNullableField("c_varchar", STRING) + .addNullableField("c_arr", FieldType.array(STRING)) + .build(); + private static final String collection = "collection"; + private static MongoDBPipelineOptions options; + + @Rule public final TestPipeline writePipeline = TestPipeline.create(); + @Rule public final TestPipeline readPipeline = TestPipeline.create(); + + @BeforeClass + public static void setUp() throws Exception { +
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=337028=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-337028 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 31/Oct/19 20:23 Start Date: 31/Oct/19 20:23 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #9892: [BEAM-8427] [SQL] buildIOWrite from MongoDb Table URL: https://github.com/apache/beam/pull/9892#issuecomment-548553421 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 337028) Time Spent: 3.5h (was: 3h 20m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=337027=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-337027 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 31/Oct/19 20:23 Start Date: 31/Oct/19 20:23 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #9892: [BEAM-8427] [SQL] buildIOWrite from MongoDb Table URL: https://github.com/apache/beam/pull/9892#issuecomment-548553421 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 337027) Time Spent: 3h 20m (was: 3h 10m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=337018=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-337018 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 31/Oct/19 20:09 Start Date: 31/Oct/19 20:09 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r341343191 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.mongodb.MongoDbIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.JsonToRow; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.SingleOutput; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting; +import org.bson.Document; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; + +@Experimental +public class MongoDbTable extends SchemaBaseBeamTable implements Serializable { + // Should match: mongodb://username:password@localhost:27017/database/collection + @VisibleForTesting + final Pattern locationPattern = + Pattern.compile( + "(?mongodb://(?.*:.*@)?.+:\\d+)/(?.+)/(?.+)"); + + @VisibleForTesting final String dbCollection; + @VisibleForTesting final String dbName; + @VisibleForTesting final String dbUri; + + MongoDbTable(Table table) { +super(table.getSchema()); + +String location = table.getLocation(); +Matcher matcher = locationPattern.matcher(location); +checkArgument( +matcher.matches(), +"MongoDb location must be in the following format: 'mongodb://(username:password@)?localhost:27017/database/collection'"); Review comment: I see what you mean, replaced `?`with `[` `]` to be consistent with MongoDbIO format. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 337018) Time Spent: 3h 10m (was: 3h) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=337003=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-337003 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 31/Oct/19 19:11 Start Date: 31/Oct/19 19:11 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r341320380 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.mongodb.MongoDbIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.JsonToRow; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.SingleOutput; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting; +import org.bson.Document; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; + +@Experimental +public class MongoDbTable extends SchemaBaseBeamTable implements Serializable { + // Should match: mongodb://username:password@localhost:27017/database/collection + @VisibleForTesting + final Pattern locationPattern = + Pattern.compile( + "(?mongodb://(?.*:.*@)?.+:\\d+)/(?.+)/(?.+)"); + + @VisibleForTesting final String dbCollection; + @VisibleForTesting final String dbName; + @VisibleForTesting final String dbUri; + + MongoDbTable(Table table) { +super(table.getSchema()); + +String location = table.getLocation(); +Matcher matcher = locationPattern.matcher(location); +checkArgument( +matcher.matches(), +"MongoDb location must be in the following format: 'mongodb://(username:password@)?localhost:27017/database/collection'"); +this.dbUri = matcher.group("credsHostPort"); // "mongodb://localhost:27017" +this.dbName = matcher.group("database"); +this.dbCollection = matcher.group("collection"); + } + + @Override + public PCollection buildIOReader(PBegin begin) { +// Read MongoDb Documents +PCollection readDocuments = +MongoDbIO.read() +.withUri(dbUri) +.withDatabase(dbName) +.withCollection(dbCollection) +.expand(begin); + +return readDocuments +// TODO: figure out a way convert Document directly to Row. +.apply("Convert Document to JSON", createParserParDo()) +.apply("Transform JSON to Row", JsonToRow.withSchema(getSchema())) +.setRowSchema(getSchema()); + } + + @Override + public POutput buildIOWriter(PCollection input) { +throw new UnsupportedOperationException("Writing to a MongoDB is not supported"); + } + + @Override + public IsBounded isBounded() { +return IsBounded.BOUNDED; + } + + @Override + public BeamTableStatistics getTableStatistics(PipelineOptions options) { +return BeamTableStatistics.BOUNDED_UNKNOWN; Review comment: Awesome, thanks! This is an automated message from the Apache Git Service. To respond to the message,
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=336999=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-336999 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 31/Oct/19 18:50 Start Date: 31/Oct/19 18:50 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r341311660 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.mongodb.MongoDbIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.JsonToRow; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.SingleOutput; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting; +import org.bson.Document; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; + +@Experimental +public class MongoDbTable extends SchemaBaseBeamTable implements Serializable { + // Should match: mongodb://username:password@localhost:27017/database/collection + @VisibleForTesting + final Pattern locationPattern = + Pattern.compile( + "(?mongodb://(?.*:.*@)?.+:\\d+)/(?.+)/(?.+)"); + + @VisibleForTesting final String dbCollection; + @VisibleForTesting final String dbName; + @VisibleForTesting final String dbUri; + + MongoDbTable(Table table) { +super(table.getSchema()); + +String location = table.getLocation(); +Matcher matcher = locationPattern.matcher(location); +checkArgument( +matcher.matches(), +"MongoDb location must be in the following format: 'mongodb://(username:password@)?localhost:27017/database/collection'"); Review comment: I guess I was expecting something like the MonogoDbIO string you linked, where the parens (or square brackets) indicate something is optional on their own. To me this looks like the the `?` could be part of the actual location string. This is just a bikeshed, I'm sure it will get the point across to any reasonable user either way, so I'm fine with leaving it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 336999) Time Spent: 2h 50m (was: 2h 40m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=335260=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335260 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 28/Oct/19 23:12 Start Date: 28/Oct/19 23:12 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#issuecomment-547185697 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 335260) Time Spent: 2h 40m (was: 2.5h) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=335259=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335259 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 28/Oct/19 23:12 Start Date: 28/Oct/19 23:12 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#issuecomment-547185697 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 335259) Time Spent: 2.5h (was: 2h 20m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=335177=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335177 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 28/Oct/19 20:54 Start Date: 28/Oct/19 20:54 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339787399 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.mongodb.MongoDbIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.JsonToRow; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.SingleOutput; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting; +import org.bson.Document; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; + +@Experimental +public class MongoDbTable extends SchemaBaseBeamTable implements Serializable { + // Should match: mongodb://username:password@localhost:27017/database/collection + @VisibleForTesting + final Pattern locationPattern = + Pattern.compile( + "(?mongodb://(?.*:.*@)?.+:\\d+)/(?.+)/(?.+)"); + + @VisibleForTesting final String dbCollection; + @VisibleForTesting final String dbName; + @VisibleForTesting final String dbUri; + + MongoDbTable(Table table) { +super(table.getSchema()); + +String location = table.getLocation(); +Matcher matcher = locationPattern.matcher(location); +checkArgument( +matcher.matches(), +"MongoDb location must be in the following format: 'mongodb://(username:password@)?localhost:27017/database/collection'"); Review comment: I believe that `?` should be there, since `username:password@` are optional according to the JavaDoc here: https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java#L208-L225 It also states that `For some authentication mechanisms, only the username is specified and the password is not, in which case the ":" after the username is left off as well`, updated the pattern to take that into account as well (and added tests to make sure it works as intended). I decided to make a port number mandatory for now. I feel like it would be a good idea to break that regular expression down into multiple logical layers (ex: `layer1` -> extract `uri`, `database`, and `collection`; `layer2` -> extract `username:password@`, `port`). The main benefit is improved readability, especially if we plan to add support for all features mentioned in the JavaDoc: https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java#L202 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=335171=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335171 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 28/Oct/19 20:53 Start Date: 28/Oct/19 20:53 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339787255 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.mongodb.MongoDbIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.JsonToRow; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.SingleOutput; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting; +import org.bson.Document; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; + +@Experimental +public class MongoDbTable extends SchemaBaseBeamTable implements Serializable { + // Should match: mongodb://username:password@localhost:27017/database/collection + @VisibleForTesting + final Pattern locationPattern = + Pattern.compile( + "(?mongodb://(?.*:.*@)?.+:\\d+)/(?.+)/(?.+)"); + + @VisibleForTesting final String dbCollection; + @VisibleForTesting final String dbName; + @VisibleForTesting final String dbUri; + + MongoDbTable(Table table) { +super(table.getSchema()); + +String location = table.getLocation(); +Matcher matcher = locationPattern.matcher(location); +checkArgument( +matcher.matches(), +"MongoDb location must be in the following format: 'mongodb://(username:password@)?localhost:27017/database/collection'"); +this.dbUri = matcher.group("credsHostPort"); // "mongodb://localhost:27017" +this.dbName = matcher.group("database"); +this.dbCollection = matcher.group("collection"); + } + + @Override + public PCollection buildIOReader(PBegin begin) { +// Read MongoDb Documents +PCollection readDocuments = +MongoDbIO.read() +.withUri(dbUri) +.withDatabase(dbName) +.withCollection(dbCollection) +.expand(begin); + +return readDocuments +// TODO: figure out a way convert Document directly to Row. +.apply("Convert Document to JSON", createParserParDo()) +.apply("Transform JSON to Row", JsonToRow.withSchema(getSchema())) +.setRowSchema(getSchema()); + } + + @Override + public POutput buildIOWriter(PCollection input) { +throw new UnsupportedOperationException("Writing to a MongoDB is not supported"); + } + + @Override + public IsBounded isBounded() { +return IsBounded.BOUNDED; + } + + @Override + public BeamTableStatistics getTableStatistics(PipelineOptions options) { +return BeamTableStatistics.BOUNDED_UNKNOWN; Review comment: Right now MongoDbIO can only get an estimate Byte size via `MongoDbIO.BoundedMongoDbSource#getEstimatedSizeBytes`. I added a similar method to extract a `count` of Documents.
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=335173=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335173 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 28/Oct/19 20:53 Start Date: 28/Oct/19 20:53 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339787299 ## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTableTest.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; +import static org.apache.beam.sdk.schemas.Schema.FieldType.BYTE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT16; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64; +import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING; + +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.bson.Document; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class MongoDbTableTest { + + private static final Schema SCHEMA = + Schema.builder() + .addNullableField("long", INT64) + .addNullableField("int32", INT32) + .addNullableField("int16", INT16) + .addNullableField("byte", BYTE) + .addNullableField("bool", BOOLEAN) + .addNullableField("double", DOUBLE) + .addNullableField("float", FLOAT) + .addNullableField("string", STRING) + .addNullableField("arr", FieldType.array(STRING)) + .build(); + private static final String JSON_ROW = + "{ " + + "\"long\" : 9223372036854775807, " + + "\"int32\" : 2147483647, " + + "\"int16\" : 32767, " + + "\"byte\" : 127, " + + "\"bool\" : true, " + + "\"double\" : 1.0, " + + "\"float\" : 1.0, " + + "\"string\" : \"string\", " + + "\"arr\" : [\"str1\", \"str2\", \"str3\"]" + + " }"; + private static final MongoDbTable SQL_TABLE = + (MongoDbTable) + new MongoDbTableProvider() + .buildBeamSqlTable( + fakeTable("TEST", "mongodb://localhost:27017/database/collection")); + + @Rule public transient TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testDocumentToRowConverter() { +PCollection output = +pipeline +.apply("Create document from JSON", Create.of(Document.parse(JSON_ROW))) +.apply("Decode document back into JSON", SQL_TABLE.createParserParDo()); + +// Make sure JSON was decoded correctly +PAssert.that(output).containsInAnyOrder(JSON_ROW); Review comment: Makes sense, updated the test to check that the same `Row`s are produced. It would be a good thing to add at some point! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 335173) Time Spent: 1h 40m (was: 1.5h) > [SQL]
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=335175=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335175 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 28/Oct/19 20:53 Start Date: 28/Oct/19 20:53 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339787353 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/package-info.java ## @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Table schema for MongoDb. */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; Review comment: Updated the package-info file, thanks for pointing this out! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 335175) Time Spent: 2h (was: 1h 50m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=335176=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335176 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 28/Oct/19 20:53 Start Date: 28/Oct/19 20:53 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339787370 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.mongodb.MongoDbIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.JsonToRow; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.SingleOutput; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting; +import org.bson.Document; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; + +@Experimental +public class MongoDbTable extends SchemaBaseBeamTable implements Serializable { + // Should match: mongodb://username:password@localhost:27017/database/collection + @VisibleForTesting + final Pattern locationPattern = + Pattern.compile( + "(?mongodb://(?.*:.*@)?.+:\\d+)/(?.+)/(?.+)"); + + @VisibleForTesting final String dbCollection; + @VisibleForTesting final String dbName; + @VisibleForTesting final String dbUri; + + MongoDbTable(Table table) { +super(table.getSchema()); + +String location = table.getLocation(); +Matcher matcher = locationPattern.matcher(location); +checkArgument( +matcher.matches(), +"MongoDb location must be in the following format: 'mongodb://(username:password@)?localhost:27017/database/collection'"); +this.dbUri = matcher.group("credsHostPort"); // "mongodb://localhost:27017" +this.dbName = matcher.group("database"); +this.dbCollection = matcher.group("collection"); + } + + @Override + public PCollection buildIOReader(PBegin begin) { +// Read MongoDb Documents +PCollection readDocuments = +MongoDbIO.read() +.withUri(dbUri) +.withDatabase(dbName) +.withCollection(dbCollection) +.expand(begin); + +return readDocuments +// TODO: figure out a way convert Document directly to Row. +.apply("Convert Document to JSON", createParserParDo()) +.apply("Transform JSON to Row", JsonToRow.withSchema(getSchema())) +.setRowSchema(getSchema()); + } + + @Override + public POutput buildIOWriter(PCollection input) { +throw new UnsupportedOperationException("Writing to a MongoDB is not supported"); + } + + @Override + public IsBounded isBounded() { +return IsBounded.BOUNDED; + } + + @Override + public BeamTableStatistics getTableStatistics(PipelineOptions options) { +return BeamTableStatistics.BOUNDED_UNKNOWN; + } + + @VisibleForTesting + SingleOutput createParserParDo() { +return ParDo.of(new DocumentToJsonStringConverter()); + } + + // TODO: add support for complex fields. + @VisibleForTesting +
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=335169=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335169 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 28/Oct/19 20:53 Start Date: 28/Oct/19 20:53 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339787221 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.mongodb.MongoDbIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.JsonToRow; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.SingleOutput; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting; +import org.bson.Document; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; + +@Experimental +public class MongoDbTable extends SchemaBaseBeamTable implements Serializable { + // Should match: mongodb://username:password@localhost:27017/database/collection + @VisibleForTesting + final Pattern locationPattern = + Pattern.compile( + "(?mongodb://(?.*:.*@)?.+:\\d+)/(?.+)/(?.+)"); + + @VisibleForTesting final String dbCollection; + @VisibleForTesting final String dbName; + @VisibleForTesting final String dbUri; + + MongoDbTable(Table table) { +super(table.getSchema()); + +String location = table.getLocation(); +Matcher matcher = locationPattern.matcher(location); +checkArgument( +matcher.matches(), +"MongoDb location must be in the following format: 'mongodb://(username:password@)?localhost:27017/database/collection'"); +this.dbUri = matcher.group("credsHostPort"); // "mongodb://localhost:27017" +this.dbName = matcher.group("database"); +this.dbCollection = matcher.group("collection"); + } + + @Override + public PCollection buildIOReader(PBegin begin) { +// Read MongoDb Documents +PCollection readDocuments = +MongoDbIO.read() +.withUri(dbUri) +.withDatabase(dbName) +.withCollection(dbCollection) +.expand(begin); + +return readDocuments +// TODO: figure out a way convert Document directly to Row. Review comment: Created a jira and updated a TODO comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 335169) Time Spent: 1h (was: 50m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components:
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=335172=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335172 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 28/Oct/19 20:53 Start Date: 28/Oct/19 20:53 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339787287 ## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java ## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; +import static org.apache.beam.sdk.schemas.Schema.FieldType.BYTE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT16; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64; +import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING; +import static org.junit.Assert.assertEquals; + +import com.mongodb.MongoClient; +import java.util.Arrays; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; +import org.apache.beam.sdk.io.mongodb.MongoDBIOIT.MongoDBPipelineOptions; +import org.apache.beam.sdk.io.mongodb.MongoDbIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.ToJson; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.bson.Document; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * A test of {@link org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbTable} on an + * independent Mongo instance. + * + * This test requires a running instance of MongoDB. Pass in connection information using + * PipelineOptions: + * + * + * ./gradlew integrationTest -p sdks/java/extensions/sql/integrationTest -DintegrationTestPipelineOptions='[ + * "--mongoDBHostName=1.2.3.4", + * "--mongoDBPort=27017", + * "--mongoDBDatabaseName=mypass", + * "--numberOfRecords=1000" ]' + * --tests org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbReadWriteIT + * -DintegrationTestRunner=direct + * + * + * Please see 'build_rules.gradle' file for instructions regarding running this test using Beam + * performance testing framework. + */ +@RunWith(JUnit4.class) +public class MongoDbReadWriteIT { + private static final Schema SOURCE_SCHEMA = + Schema.builder() + .addNullableField("_id", STRING) + .addNullableField("c_bigint", INT64) + .addNullableField("c_tinyint", BYTE) + .addNullableField("c_smallint", INT16) + .addNullableField("c_integer", INT32) + .addNullableField("c_float", FLOAT) + .addNullableField("c_double", DOUBLE) + .addNullableField("c_boolean", BOOLEAN) + .addNullableField("c_varchar", STRING) + .addNullableField("c_arr", FieldType.array(STRING)) + .build(); + private static final String collection = "collection"; + private static MongoDBPipelineOptions options; + + @Rule public final TestPipeline writePipeline = TestPipeline.create(); + @Rule public final
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=335174=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335174 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 28/Oct/19 20:53 Start Date: 28/Oct/19 20:53 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339787322 ## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java ## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; +import static org.apache.beam.sdk.schemas.Schema.FieldType.BYTE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT16; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64; +import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING; +import static org.junit.Assert.assertEquals; + +import com.mongodb.MongoClient; +import java.util.Arrays; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; +import org.apache.beam.sdk.io.mongodb.MongoDBIOIT.MongoDBPipelineOptions; +import org.apache.beam.sdk.io.mongodb.MongoDbIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.ToJson; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.bson.Document; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * A test of {@link org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbTable} on an + * independent Mongo instance. + * + * This test requires a running instance of MongoDB. Pass in connection information using + * PipelineOptions: + * + * + * ./gradlew integrationTest -p sdks/java/extensions/sql/integrationTest -DintegrationTestPipelineOptions='[ + * "--mongoDBHostName=1.2.3.4", + * "--mongoDBPort=27017", + * "--mongoDBDatabaseName=mypass", + * "--numberOfRecords=1000" ]' + * --tests org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbReadWriteIT + * -DintegrationTestRunner=direct + * + * + * Please see 'build_rules.gradle' file for instructions regarding running this test using Beam + * performance testing framework. + */ +@RunWith(JUnit4.class) +public class MongoDbReadWriteIT { + private static final Schema SOURCE_SCHEMA = + Schema.builder() + .addNullableField("_id", STRING) + .addNullableField("c_bigint", INT64) + .addNullableField("c_tinyint", BYTE) + .addNullableField("c_smallint", INT16) + .addNullableField("c_integer", INT32) + .addNullableField("c_float", FLOAT) + .addNullableField("c_double", DOUBLE) + .addNullableField("c_boolean", BOOLEAN) + .addNullableField("c_varchar", STRING) + .addNullableField("c_arr", FieldType.array(STRING)) + .build(); + private static final String collection = "collection"; + private static MongoDBPipelineOptions options; + + @Rule public final TestPipeline writePipeline = TestPipeline.create(); + @Rule public final
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=335170=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335170 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 28/Oct/19 20:53 Start Date: 28/Oct/19 20:53 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339787238 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.mongodb.MongoDbIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.JsonToRow; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.SingleOutput; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting; +import org.bson.Document; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; + +@Experimental +public class MongoDbTable extends SchemaBaseBeamTable implements Serializable { + // Should match: mongodb://username:password@localhost:27017/database/collection + @VisibleForTesting + final Pattern locationPattern = + Pattern.compile( + "(?mongodb://(?.*:.*@)?.+:\\d+)/(?.+)/(?.+)"); + + @VisibleForTesting final String dbCollection; + @VisibleForTesting final String dbName; + @VisibleForTesting final String dbUri; + + MongoDbTable(Table table) { +super(table.getSchema()); + +String location = table.getLocation(); +Matcher matcher = locationPattern.matcher(location); +checkArgument( +matcher.matches(), +"MongoDb location must be in the following format: 'mongodb://(username:password@)?localhost:27017/database/collection'"); +this.dbUri = matcher.group("credsHostPort"); // "mongodb://localhost:27017" +this.dbName = matcher.group("database"); +this.dbCollection = matcher.group("collection"); + } + + @Override + public PCollection buildIOReader(PBegin begin) { +// Read MongoDb Documents +PCollection readDocuments = +MongoDbIO.read() +.withUri(dbUri) +.withDatabase(dbName) +.withCollection(dbCollection) +.expand(begin); + +return readDocuments +// TODO: figure out a way convert Document directly to Row. +.apply("Convert Document to JSON", createParserParDo()) +.apply("Transform JSON to Row", JsonToRow.withSchema(getSchema())) +.setRowSchema(getSchema()); + } + + @Override + public POutput buildIOWriter(PCollection input) { +throw new UnsupportedOperationException("Writing to a MongoDB is not supported"); + } + + @Override + public IsBounded isBounded() { +return IsBounded.BOUNDED; + } + + @Override + public BeamTableStatistics getTableStatistics(PipelineOptions options) { +return BeamTableStatistics.BOUNDED_UNKNOWN; + } + + @VisibleForTesting + SingleOutput createParserParDo() { +return ParDo.of(new DocumentToJsonStringConverter()); + } + + // TODO: add support for complex fields. Review comment:
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=334439=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334439 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 25/Oct/19 23:59 Start Date: 25/Oct/19 23:59 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339267118 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.mongodb.MongoDbIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.JsonToRow; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.SingleOutput; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting; +import org.bson.Document; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; + +@Experimental +public class MongoDbTable extends SchemaBaseBeamTable implements Serializable { + // Should match: mongodb://username:password@localhost:27017/database/collection + @VisibleForTesting + final Pattern locationPattern = + Pattern.compile( + "(?mongodb://(?.*:.*@)?.+:\\d+)/(?.+)/(?.+)"); + + @VisibleForTesting final String dbCollection; + @VisibleForTesting final String dbName; + @VisibleForTesting final String dbUri; + + MongoDbTable(Table table) { +super(table.getSchema()); + +String location = table.getLocation(); +Matcher matcher = locationPattern.matcher(location); +checkArgument( +matcher.matches(), +"MongoDb location must be in the following format: 'mongodb://(username:password@)?localhost:27017/database/collection'"); +this.dbUri = matcher.group("credsHostPort"); // "mongodb://localhost:27017" +this.dbName = matcher.group("database"); +this.dbCollection = matcher.group("collection"); + } + + @Override + public PCollection buildIOReader(PBegin begin) { +// Read MongoDb Documents +PCollection readDocuments = +MongoDbIO.read() +.withUri(dbUri) +.withDatabase(dbName) +.withCollection(dbCollection) +.expand(begin); + +return readDocuments +// TODO: figure out a way convert Document directly to Row. +.apply("Convert Document to JSON", createParserParDo()) +.apply("Transform JSON to Row", JsonToRow.withSchema(getSchema())) +.setRowSchema(getSchema()); + } + + @Override + public POutput buildIOWriter(PCollection input) { +throw new UnsupportedOperationException("Writing to a MongoDB is not supported"); + } + + @Override + public IsBounded isBounded() { +return IsBounded.BOUNDED; + } + + @Override + public BeamTableStatistics getTableStatistics(PipelineOptions options) { +return BeamTableStatistics.BOUNDED_UNKNOWN; Review comment: Does MongoDB have any interface we could use to implement this? If so a TODO/Jira pointing in the right direction would be nice. If not, maybe add a comment stating why.
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=334443=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334443 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 25/Oct/19 23:59 Start Date: 25/Oct/19 23:59 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339272952 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/package-info.java ## @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Table schema for MongoDb. */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; Review comment: I think these package-info files are supposed to have `@DefaultAnnotation(NonNull.class)` like [this](https://github.com/apache/beam/blob/928c4df93491f1d15ccfe36fe7887a27e4bdfbdc/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/package-info.java#L20). (Although it looks like not all of them [do right now](https://github.com/apache/beam/blob/928c4df93491f1d15ccfe36fe7887a27e4bdfbdc/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/package-info.java#L20)). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 334443) Time Spent: 50m (was: 40m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=334441=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334441 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 25/Oct/19 23:59 Start Date: 25/Oct/19 23:59 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339273986 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.mongodb.MongoDbIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.JsonToRow; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.SingleOutput; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting; +import org.bson.Document; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; + +@Experimental +public class MongoDbTable extends SchemaBaseBeamTable implements Serializable { + // Should match: mongodb://username:password@localhost:27017/database/collection + @VisibleForTesting + final Pattern locationPattern = + Pattern.compile( + "(?mongodb://(?.*:.*@)?.+:\\d+)/(?.+)/(?.+)"); + + @VisibleForTesting final String dbCollection; + @VisibleForTesting final String dbName; + @VisibleForTesting final String dbUri; + + MongoDbTable(Table table) { +super(table.getSchema()); + +String location = table.getLocation(); +Matcher matcher = locationPattern.matcher(location); +checkArgument( +matcher.matches(), +"MongoDb location must be in the following format: 'mongodb://(username:password@)?localhost:27017/database/collection'"); Review comment: I think maybe this `?` is a copy-pasta error. That's not actually part of the format is it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 334441) Time Spent: 50m (was: 40m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=33=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-33 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 25/Oct/19 23:59 Start Date: 25/Oct/19 23:59 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339273805 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.mongodb.MongoDbIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.JsonToRow; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.SingleOutput; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting; +import org.bson.Document; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; + +@Experimental +public class MongoDbTable extends SchemaBaseBeamTable implements Serializable { + // Should match: mongodb://username:password@localhost:27017/database/collection + @VisibleForTesting + final Pattern locationPattern = + Pattern.compile( + "(?mongodb://(?.*:.*@)?.+:\\d+)/(?.+)/(?.+)"); + + @VisibleForTesting final String dbCollection; + @VisibleForTesting final String dbName; + @VisibleForTesting final String dbUri; + + MongoDbTable(Table table) { +super(table.getSchema()); + +String location = table.getLocation(); +Matcher matcher = locationPattern.matcher(location); +checkArgument( +matcher.matches(), +"MongoDb location must be in the following format: 'mongodb://(username:password@)?localhost:27017/database/collection'"); +this.dbUri = matcher.group("credsHostPort"); // "mongodb://localhost:27017" +this.dbName = matcher.group("database"); +this.dbCollection = matcher.group("collection"); + } + + @Override + public PCollection buildIOReader(PBegin begin) { +// Read MongoDb Documents +PCollection readDocuments = +MongoDbIO.read() +.withUri(dbUri) +.withDatabase(dbName) +.withCollection(dbCollection) +.expand(begin); + +return readDocuments +// TODO: figure out a way convert Document directly to Row. +.apply("Convert Document to JSON", createParserParDo()) +.apply("Transform JSON to Row", JsonToRow.withSchema(getSchema())) +.setRowSchema(getSchema()); + } + + @Override + public POutput buildIOWriter(PCollection input) { +throw new UnsupportedOperationException("Writing to a MongoDB is not supported"); + } + + @Override + public IsBounded isBounded() { +return IsBounded.BOUNDED; + } + + @Override + public BeamTableStatistics getTableStatistics(PipelineOptions options) { +return BeamTableStatistics.BOUNDED_UNKNOWN; + } + + @VisibleForTesting + SingleOutput createParserParDo() { +return ParDo.of(new DocumentToJsonStringConverter()); + } + + // TODO: add support for complex fields. +
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=334440=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334440 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 25/Oct/19 23:59 Start Date: 25/Oct/19 23:59 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339264449 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.mongodb.MongoDbIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.JsonToRow; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.SingleOutput; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting; +import org.bson.Document; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; + +@Experimental +public class MongoDbTable extends SchemaBaseBeamTable implements Serializable { + // Should match: mongodb://username:password@localhost:27017/database/collection + @VisibleForTesting + final Pattern locationPattern = + Pattern.compile( + "(?mongodb://(?.*:.*@)?.+:\\d+)/(?.+)/(?.+)"); + + @VisibleForTesting final String dbCollection; + @VisibleForTesting final String dbName; + @VisibleForTesting final String dbUri; + + MongoDbTable(Table table) { +super(table.getSchema()); + +String location = table.getLocation(); +Matcher matcher = locationPattern.matcher(location); +checkArgument( +matcher.matches(), +"MongoDb location must be in the following format: 'mongodb://(username:password@)?localhost:27017/database/collection'"); +this.dbUri = matcher.group("credsHostPort"); // "mongodb://localhost:27017" +this.dbName = matcher.group("database"); +this.dbCollection = matcher.group("collection"); + } + + @Override + public PCollection buildIOReader(PBegin begin) { +// Read MongoDb Documents +PCollection readDocuments = +MongoDbIO.read() +.withUri(dbUri) +.withDatabase(dbName) +.withCollection(dbCollection) +.expand(begin); + +return readDocuments +// TODO: figure out a way convert Document directly to Row. Review comment: Can you make a jira for this and change this to `TODO(BEAM-)`? Also maybe add a note there about how it should probably use `RowWithGetters` rather than `RowWithStorage` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 334440) Time Spent: 40m (was: 0.5h) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL:
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=334438=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334438 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 25/Oct/19 23:59 Start Date: 25/Oct/19 23:59 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339268863 ## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java ## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; +import static org.apache.beam.sdk.schemas.Schema.FieldType.BYTE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT16; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64; +import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING; +import static org.junit.Assert.assertEquals; + +import com.mongodb.MongoClient; +import java.util.Arrays; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; +import org.apache.beam.sdk.io.mongodb.MongoDBIOIT.MongoDBPipelineOptions; +import org.apache.beam.sdk.io.mongodb.MongoDbIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.ToJson; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.bson.Document; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * A test of {@link org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbTable} on an + * independent Mongo instance. + * + * This test requires a running instance of MongoDB. Pass in connection information using + * PipelineOptions: + * + * + * ./gradlew integrationTest -p sdks/java/extensions/sql/integrationTest -DintegrationTestPipelineOptions='[ + * "--mongoDBHostName=1.2.3.4", + * "--mongoDBPort=27017", + * "--mongoDBDatabaseName=mypass", + * "--numberOfRecords=1000" ]' + * --tests org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbReadWriteIT + * -DintegrationTestRunner=direct + * + * + * Please see 'build_rules.gradle' file for instructions regarding running this test using Beam + * performance testing framework. + */ +@RunWith(JUnit4.class) +public class MongoDbReadWriteIT { + private static final Schema SOURCE_SCHEMA = + Schema.builder() + .addNullableField("_id", STRING) + .addNullableField("c_bigint", INT64) + .addNullableField("c_tinyint", BYTE) + .addNullableField("c_smallint", INT16) + .addNullableField("c_integer", INT32) + .addNullableField("c_float", FLOAT) + .addNullableField("c_double", DOUBLE) + .addNullableField("c_boolean", BOOLEAN) + .addNullableField("c_varchar", STRING) + .addNullableField("c_arr", FieldType.array(STRING)) + .build(); + private static final String collection = "collection"; + private static MongoDBPipelineOptions options; + + @Rule public final TestPipeline writePipeline = TestPipeline.create(); + @Rule public final
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=334436=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334436 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 25/Oct/19 23:59 Start Date: 25/Oct/19 23:59 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339272432 ## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTableTest.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; +import static org.apache.beam.sdk.schemas.Schema.FieldType.BYTE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT16; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64; +import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING; + +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.bson.Document; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class MongoDbTableTest { + + private static final Schema SCHEMA = + Schema.builder() + .addNullableField("long", INT64) + .addNullableField("int32", INT32) + .addNullableField("int16", INT16) + .addNullableField("byte", BYTE) + .addNullableField("bool", BOOLEAN) + .addNullableField("double", DOUBLE) + .addNullableField("float", FLOAT) + .addNullableField("string", STRING) + .addNullableField("arr", FieldType.array(STRING)) + .build(); + private static final String JSON_ROW = + "{ " + + "\"long\" : 9223372036854775807, " + + "\"int32\" : 2147483647, " + + "\"int16\" : 32767, " + + "\"byte\" : 127, " + + "\"bool\" : true, " + + "\"double\" : 1.0, " + + "\"float\" : 1.0, " + + "\"string\" : \"string\", " + + "\"arr\" : [\"str1\", \"str2\", \"str3\"]" + + " }"; + private static final MongoDbTable SQL_TABLE = + (MongoDbTable) + new MongoDbTableProvider() + .buildBeamSqlTable( + fakeTable("TEST", "mongodb://localhost:27017/database/collection")); + + @Rule public transient TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testDocumentToRowConverter() { +PCollection output = +pipeline +.apply("Create document from JSON", Create.of(Document.parse(JSON_ROW))) +.apply("Decode document back into JSON", SQL_TABLE.createParserParDo()); + +// Make sure JSON was decoded correctly +PAssert.that(output).containsInAnyOrder(JSON_ROW); Review comment: I think this is brittle, since the string comparison can fail if the keys are written out in a different order, and I don't think there's anything guaranteeing the order. Maybe instead verify that `output` and `JSON_ROW` can be parsed into equivalent rows? This is a problem I keep running into as I'm writing tests with JSON, it would be nice to have some general purpose way to assert that two strings are equivalent JSON. Maybe we just need to write our own `Matcher` that parses to jackson nodes and compares. (You don't need to do that here
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=334442=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334442 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 25/Oct/19 23:59 Start Date: 25/Oct/19 23:59 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339272927 ## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java ## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; +import static org.apache.beam.sdk.schemas.Schema.FieldType.BYTE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT16; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64; +import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING; +import static org.junit.Assert.assertEquals; + +import com.mongodb.MongoClient; +import java.util.Arrays; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; +import org.apache.beam.sdk.io.mongodb.MongoDBIOIT.MongoDBPipelineOptions; +import org.apache.beam.sdk.io.mongodb.MongoDbIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.ToJson; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.bson.Document; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * A test of {@link org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbTable} on an + * independent Mongo instance. + * + * This test requires a running instance of MongoDB. Pass in connection information using + * PipelineOptions: + * + * + * ./gradlew integrationTest -p sdks/java/extensions/sql/integrationTest -DintegrationTestPipelineOptions='[ + * "--mongoDBHostName=1.2.3.4", + * "--mongoDBPort=27017", + * "--mongoDBDatabaseName=mypass", + * "--numberOfRecords=1000" ]' + * --tests org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbReadWriteIT + * -DintegrationTestRunner=direct + * + * + * Please see 'build_rules.gradle' file for instructions regarding running this test using Beam + * performance testing framework. + */ +@RunWith(JUnit4.class) +public class MongoDbReadWriteIT { + private static final Schema SOURCE_SCHEMA = + Schema.builder() + .addNullableField("_id", STRING) + .addNullableField("c_bigint", INT64) + .addNullableField("c_tinyint", BYTE) + .addNullableField("c_smallint", INT16) + .addNullableField("c_integer", INT32) + .addNullableField("c_float", FLOAT) + .addNullableField("c_double", DOUBLE) + .addNullableField("c_boolean", BOOLEAN) + .addNullableField("c_varchar", STRING) + .addNullableField("c_arr", FieldType.array(STRING)) + .build(); + private static final String collection = "collection"; + private static MongoDBPipelineOptions options; + + @Rule public final TestPipeline writePipeline = TestPipeline.create(); + @Rule public final
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=334437=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334437 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 25/Oct/19 23:59 Start Date: 25/Oct/19 23:59 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#discussion_r339265846 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.mongodb.MongoDbIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.JsonToRow; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.SingleOutput; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting; +import org.bson.Document; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; + +@Experimental +public class MongoDbTable extends SchemaBaseBeamTable implements Serializable { + // Should match: mongodb://username:password@localhost:27017/database/collection + @VisibleForTesting + final Pattern locationPattern = + Pattern.compile( + "(?mongodb://(?.*:.*@)?.+:\\d+)/(?.+)/(?.+)"); + + @VisibleForTesting final String dbCollection; + @VisibleForTesting final String dbName; + @VisibleForTesting final String dbUri; + + MongoDbTable(Table table) { +super(table.getSchema()); + +String location = table.getLocation(); +Matcher matcher = locationPattern.matcher(location); +checkArgument( +matcher.matches(), +"MongoDb location must be in the following format: 'mongodb://(username:password@)?localhost:27017/database/collection'"); +this.dbUri = matcher.group("credsHostPort"); // "mongodb://localhost:27017" +this.dbName = matcher.group("database"); +this.dbCollection = matcher.group("collection"); + } + + @Override + public PCollection buildIOReader(PBegin begin) { +// Read MongoDb Documents +PCollection readDocuments = +MongoDbIO.read() +.withUri(dbUri) +.withDatabase(dbName) +.withCollection(dbCollection) +.expand(begin); + +return readDocuments +// TODO: figure out a way convert Document directly to Row. +.apply("Convert Document to JSON", createParserParDo()) +.apply("Transform JSON to Row", JsonToRow.withSchema(getSchema())) +.setRowSchema(getSchema()); + } + + @Override + public POutput buildIOWriter(PCollection input) { +throw new UnsupportedOperationException("Writing to a MongoDB is not supported"); + } + + @Override + public IsBounded isBounded() { +return IsBounded.BOUNDED; + } + + @Override + public BeamTableStatistics getTableStatistics(PipelineOptions options) { +return BeamTableStatistics.BOUNDED_UNKNOWN; + } + + @VisibleForTesting + SingleOutput createParserParDo() { +return ParDo.of(new DocumentToJsonStringConverter()); + } + + // TODO: add support for complex fields. Review comment:
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=334377=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334377 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 25/Oct/19 21:20 Start Date: 25/Oct/19 21:20 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #9892: [BEAM-8427] [SQL] buildIOWrite from MongoDb Table URL: https://github.com/apache/beam/pull/9892 - Implemented write functionality for MongoDbTable. - Updated conversion logic for RowJsonSerializer. Based on top of #9806. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=334373=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334373 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 25/Oct/19 21:13 Start Date: 25/Oct/19 21:13 Worklog Time Spent: 10m Work Description: apilloud commented on issue #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#issuecomment-546515561 R: @TheNeuralBit Brian, can you take a look at this? You are probably more familiar with JSON stuff than I am. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 334373) Time Spent: 20m (was: 10m) > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8427) [SQL] Add support for MongoDB source
[ https://issues.apache.org/jira/browse/BEAM-8427?focusedWorklogId=330104=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-330104 ] ASF GitHub Bot logged work on BEAM-8427: Author: ASF GitHub Bot Created on: 17/Oct/19 19:54 Start Date: 17/Oct/19 19:54 Worklog Time Spent: 10m Work Description: 11moon11 commented on issue #9806: [BEAM-8427] Create a table and a table provider for MongoDB URL: https://github.com/apache/beam/pull/9806#issuecomment-543335637 R: @apilloud cc: @amaliujia This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 330104) Remaining Estimate: 0h Time Spent: 10m > [SQL] Add support for MongoDB source > > > Key: BEAM-8427 > URL: https://issues.apache.org/jira/browse/BEAM-8427 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Kirill Kozlov >Assignee: Kirill Kozlov >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > In progress: > * Create a MongoDB table and table provider. > * Implement buildIOReader > * Support primitive types > Still needs to be done: > * Implement buildIOWrite > * improve getTableStatistics -- This message was sent by Atlassian Jira (v8.3.4#803005)