[jira] [Commented] (BEAM-7256) Add support for allowDiskUse (AggregationOptions) in MongoDbIO
[ https://issues.apache.org/jira/browse/BEAM-7256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17547776#comment-17547776 ] Kenneth Knowles commented on BEAM-7256: --- This issue has been migrated to https://github.com/apache/beam/issues/19503 > Add support for allowDiskUse (AggregationOptions) in MongoDbIO > --- > > Key: BEAM-7256 > URL: https://issues.apache.org/jira/browse/BEAM-7256 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.12.0 >Reporter: Javier Cornejo >Priority: P3 > Attachments: Screen Shot 2019-05-09 at 12.30.51.png > > > When a read is executed over a collection that exceed the memory limit of > 104857600 an exception occurs. This is declared by mongodb and is possible to > control the error passing a AggregationOptions allowDiskUse true so mongo can > sort with disk usage. > This should be happen only when aggregations are added to read but now is > happening even without aggregation at all. > Please let me know how can help with this improvement / bug. > > !Screen Shot 2019-05-09 at 12.30.51.png! > {code:java} > PCollection> updateColls = p.apply("Reading Ops > Collection: " + key, MongoDbIO .read() .withUri(options.getMongoDBUri()) > .withDatabase("local") .withCollection("oplog.rs") .withBucketAuto(true) // > .withQueryFn( // FindQuery.create().withFilters( // Filters.and( // > Filters.gt("ts", ts.format(dtf)), // Filters.eq("ns", > options.getMongoDBDBName() + "" + key), // Filters.eq("op", "u") // ) // ) // > // AggregationQuery.create().withMongoDbPipeline(updatedDocsOplogAggregation) > // ) ) > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (BEAM-7256) Add support for allowDiskUse (AggregationOptions) in MongoDbIO
[ https://issues.apache.org/jira/browse/BEAM-7256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387373#comment-17387373 ] Ross Lawley commented on BEAM-7256: --- Just to note the [Apache beam python sdk|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py#L513-L514] sets the {{allowDiskUse}} flag. Also the [MongoDB Spark connector|https://github.com/mongodb/mongo-spark/blob/r3.0.1/src/main/scala/com/mongodb/spark/rdd/MongoRDD.scala#L191] allows the setting of the allowDiskUse (defaults to true). Ross > Add support for allowDiskUse (AggregationOptions) in MongoDbIO > --- > > Key: BEAM-7256 > URL: https://issues.apache.org/jira/browse/BEAM-7256 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.12.0 >Reporter: Javier Cornejo >Priority: P3 > Attachments: Screen Shot 2019-05-09 at 12.30.51.png > > > When a read is executed over a collection that exceed the memory limit of > 104857600 an exception occurs. This is declared by mongodb and is possible to > control the error passing a AggregationOptions allowDiskUse true so mongo can > sort with disk usage. > This should be happen only when aggregations are added to read but now is > happening even without aggregation at all. > Please let me know how can help with this improvement / bug. > > !Screen Shot 2019-05-09 at 12.30.51.png! > {code:java} > PCollection> updateColls = p.apply("Reading Ops > Collection: " + key, MongoDbIO .read() .withUri(options.getMongoDBUri()) > .withDatabase("local") .withCollection("oplog.rs") .withBucketAuto(true) // > .withQueryFn( // FindQuery.create().withFilters( // Filters.and( // > Filters.gt("ts", ts.format(dtf)), // Filters.eq("ns", > options.getMongoDBDBName() + "" + key), // Filters.eq("op", "u") // ) // ) // > // AggregationQuery.create().withMongoDbPipeline(updatedDocsOplogAggregation) > // ) ) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-7256) Add support for allowDiskUse (AggregationOptions) in MongoDbIO
[ https://issues.apache.org/jira/browse/BEAM-7256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17310996#comment-17310996 ] Geoff Hutton commented on BEAM-7256: Thanks for the reply [~sandboxws]. The task I'm need to achieve is to simply read the _entire_ collection into the pipeline. Are you suggesting that I start multiple pipelines, each one reading a portion of the collection? There is no timestamp in my documents, and in fact no key that would guarantee that the resulting subset would be below the size limit for certain. > Add support for allowDiskUse (AggregationOptions) in MongoDbIO > --- > > Key: BEAM-7256 > URL: https://issues.apache.org/jira/browse/BEAM-7256 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.12.0 >Reporter: Javier Cornejo >Priority: P3 > Attachments: Screen Shot 2019-05-09 at 12.30.51.png > > > When a read is executed over a collection that exceed the memory limit of > 104857600 an exception occurs. This is declared by mongodb and is possible to > control the error passing a AggregationOptions allowDiskUse true so mongo can > sort with disk usage. > This should be happen only when aggregations are added to read but now is > happening even without aggregation at all. > Please let me know how can help with this improvement / bug. > > !Screen Shot 2019-05-09 at 12.30.51.png! > {code:java} > PCollection> updateColls = p.apply("Reading Ops > Collection: " + key, MongoDbIO .read() .withUri(options.getMongoDBUri()) > .withDatabase("local") .withCollection("oplog.rs") .withBucketAuto(true) // > .withQueryFn( // FindQuery.create().withFilters( // Filters.and( // > Filters.gt("ts", ts.format(dtf)), // Filters.eq("ns", > options.getMongoDBDBName() + "" + key), // Filters.eq("op", "u") // ) // ) // > // AggregationQuery.create().withMongoDbPipeline(updatedDocsOplogAggregation) > // ) ) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-7256) Add support for allowDiskUse (AggregationOptions) in MongoDbIO
[ https://issues.apache.org/jira/browse/BEAM-7256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17294978#comment-17294978 ] Ahmed El.Hussaini commented on BEAM-7256: - @ Geoff I don't recommend allowing disk usage since this could potentially affect the performance of the cluster, instead, I'd suggest scoping your query based on timestamps on top of the filters you have already in your pipeline. -- *Ahmed El.Hossaini* @sandboxws > Add support for allowDiskUse (AggregationOptions) in MongoDbIO > --- > > Key: BEAM-7256 > URL: https://issues.apache.org/jira/browse/BEAM-7256 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.12.0 >Reporter: Javier Cornejo >Priority: P3 > Attachments: Screen Shot 2019-05-09 at 12.30.51.png > > > When a read is executed over a collection that exceed the memory limit of > 104857600 an exception occurs. This is declared by mongodb and is possible to > control the error passing a AggregationOptions allowDiskUse true so mongo can > sort with disk usage. > This should be happen only when aggregations are added to read but now is > happening even without aggregation at all. > Please let me know how can help with this improvement / bug. > > !Screen Shot 2019-05-09 at 12.30.51.png! > {code:java} > PCollection> updateColls = p.apply("Reading Ops > Collection: " + key, MongoDbIO .read() .withUri(options.getMongoDBUri()) > .withDatabase("local") .withCollection("oplog.rs") .withBucketAuto(true) // > .withQueryFn( // FindQuery.create().withFilters( // Filters.and( // > Filters.gt("ts", ts.format(dtf)), // Filters.eq("ns", > options.getMongoDBDBName() + "" + key), // Filters.eq("op", "u") // ) // ) // > // AggregationQuery.create().withMongoDbPipeline(updatedDocsOplogAggregation) > // ) ) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-7256) Add support for allowDiskUse (AggregationOptions) in MongoDbIO
[ https://issues.apache.org/jira/browse/BEAM-7256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17294012#comment-17294012 ] Geoff Hutton commented on BEAM-7256: We are encountering exactly this issue trying to use MongoDbIO (in Python). We need to read from MongoDB Atlas, so we are required to pass "bucket_auto=True" (as mentioned above by [~sandboxws]). We get exactly the error mentioned in this issue, because the collections we are reading are too large to be sorted in memory. Ideally we would like to be able to turn on allow_disk_use. > Add support for allowDiskUse (AggregationOptions) in MongoDbIO > --- > > Key: BEAM-7256 > URL: https://issues.apache.org/jira/browse/BEAM-7256 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.12.0 >Reporter: Javier Cornejo >Priority: P3 > Attachments: Screen Shot 2019-05-09 at 12.30.51.png > > > When a read is executed over a collection that exceed the memory limit of > 104857600 an exception occurs. This is declared by mongodb and is possible to > control the error passing a AggregationOptions allowDiskUse true so mongo can > sort with disk usage. > This should be happen only when aggregations are added to read but now is > happening even without aggregation at all. > Please let me know how can help with this improvement / bug. > > !Screen Shot 2019-05-09 at 12.30.51.png! > {code:java} > PCollection> updateColls = p.apply("Reading Ops > Collection: " + key, MongoDbIO .read() .withUri(options.getMongoDBUri()) > .withDatabase("local") .withCollection("oplog.rs") .withBucketAuto(true) // > .withQueryFn( // FindQuery.create().withFilters( // Filters.and( // > Filters.gt("ts", ts.format(dtf)), // Filters.eq("ns", > options.getMongoDBDBName() + "" + key), // Filters.eq("op", "u") // ) // ) // > // AggregationQuery.create().withMongoDbPipeline(updatedDocsOplogAggregation) > // ) ) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-7256) Add support for allowDiskUse (AggregationOptions) in MongoDbIO
[ https://issues.apache.org/jira/browse/BEAM-7256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17137291#comment-17137291 ] Beam JIRA Bot commented on BEAM-7256: - This issue was marked "stale-P2" and has not received a public comment in 14 days. It is now automatically moved to P3. If you are still affected by it, you can comment and move it back to P2. > Add support for allowDiskUse (AggregationOptions) in MongoDbIO > --- > > Key: BEAM-7256 > URL: https://issues.apache.org/jira/browse/BEAM-7256 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.12.0 >Reporter: Javier Cornejo >Priority: P3 > Attachments: Screen Shot 2019-05-09 at 12.30.51.png > > > When a read is executed over a collection that exceed the memory limit of > 104857600 an exception occurs. This is declared by mongodb and is possible to > control the error passing a AggregationOptions allowDiskUse true so mongo can > sort with disk usage. > This should be happen only when aggregations are added to read but now is > happening even without aggregation at all. > Please let me know how can help with this improvement / bug. > > !Screen Shot 2019-05-09 at 12.30.51.png! > {code:java} > PCollection> updateColls = p.apply("Reading Ops > Collection: " + key, MongoDbIO .read() .withUri(options.getMongoDBUri()) > .withDatabase("local") .withCollection("oplog.rs") .withBucketAuto(true) // > .withQueryFn( // FindQuery.create().withFilters( // Filters.and( // > Filters.gt("ts", ts.format(dtf)), // Filters.eq("ns", > options.getMongoDBDBName() + "" + key), // Filters.eq("op", "u") // ) // ) // > // AggregationQuery.create().withMongoDbPipeline(updatedDocsOplogAggregation) > // ) ) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-7256) Add support for allowDiskUse (AggregationOptions) in MongoDbIO
[ https://issues.apache.org/jira/browse/BEAM-7256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17122829#comment-17122829 ] Beam JIRA Bot commented on BEAM-7256: - This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3. Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean. > Add support for allowDiskUse (AggregationOptions) in MongoDbIO > --- > > Key: BEAM-7256 > URL: https://issues.apache.org/jira/browse/BEAM-7256 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.12.0 >Reporter: Javier Cornejo >Priority: P2 > Labels: stale-P2 > Attachments: Screen Shot 2019-05-09 at 12.30.51.png > > > When a read is executed over a collection that exceed the memory limit of > 104857600 an exception occurs. This is declared by mongodb and is possible to > control the error passing a AggregationOptions allowDiskUse true so mongo can > sort with disk usage. > This should be happen only when aggregations are added to read but now is > happening even without aggregation at all. > Please let me know how can help with this improvement / bug. > > !Screen Shot 2019-05-09 at 12.30.51.png! > {code:java} > PCollection> updateColls = p.apply("Reading Ops > Collection: " + key, MongoDbIO .read() .withUri(options.getMongoDBUri()) > .withDatabase("local") .withCollection("oplog.rs") .withBucketAuto(true) // > .withQueryFn( // FindQuery.create().withFilters( // Filters.and( // > Filters.gt("ts", ts.format(dtf)), // Filters.eq("ns", > options.getMongoDBDBName() + "" + key), // Filters.eq("op", "u") // ) // ) // > // AggregationQuery.create().withMongoDbPipeline(updatedDocsOplogAggregation) > // ) ) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-7256) Add support for allowDiskUse (AggregationOptions) in MongoDbIO
[ https://issues.apache.org/jira/browse/BEAM-7256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17029349#comment-17029349 ] Jeff Yemin commented on BEAM-7256: -- This is possible with the {{MongoCollection}} - based API: {code} MongoCollection coll = ... coll.aggregate(pipeline).allowDiskUse(true) {code} > Add support for allowDiskUse (AggregationOptions) in MongoDbIO > --- > > Key: BEAM-7256 > URL: https://issues.apache.org/jira/browse/BEAM-7256 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.12.0 >Reporter: Javier Cornejo >Priority: Major > Attachments: Screen Shot 2019-05-09 at 12.30.51.png > > > When a read is executed over a collection that exceed the memory limit of > 104857600 an exception occurs. This is declared by mongodb and is possible to > control the error passing a AggregationOptions allowDiskUse true so mongo can > sort with disk usage. > This should be happen only when aggregations are added to read but now is > happening even without aggregation at all. > Please let me know how can help with this improvement / bug. > > !Screen Shot 2019-05-09 at 12.30.51.png! > {code:java} > PCollection> updateColls = p.apply("Reading Ops > Collection: " + key, MongoDbIO .read() .withUri(options.getMongoDBUri()) > .withDatabase("local") .withCollection("oplog.rs") .withBucketAuto(true) // > .withQueryFn( // FindQuery.create().withFilters( // Filters.and( // > Filters.gt("ts", ts.format(dtf)), // Filters.eq("ns", > options.getMongoDBDBName() + "" + key), // Filters.eq("op", "u") // ) // ) // > // AggregationQuery.create().withMongoDbPipeline(updatedDocsOplogAggregation) > // ) ) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-7256) Add support for allowDiskUse (AggregationOptions) in MongoDbIO
[ https://issues.apache.org/jira/browse/BEAM-7256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16842282#comment-16842282 ] Ahmed El.Hussaini commented on BEAM-7256: - [~iemejia] explain is costly operation since it's the equivalent of an SQL explain so it should never be on/true by default, arguably should only be allowed to be used when using the direct runner only. > Add support for allowDiskUse (AggregationOptions) in MongoDbIO > --- > > Key: BEAM-7256 > URL: https://issues.apache.org/jira/browse/BEAM-7256 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.12.0 >Reporter: Javier Cornejo >Priority: Major > Attachments: Screen Shot 2019-05-09 at 12.30.51.png > > > When a read is executed over a collection that exceed the memory limit of > 104857600 an exception occurs. This is declared by mongodb and is possible to > control the error passing a AggregationOptions allowDiskUse true so mongo can > sort with disk usage. > This should be happen only when aggregations are added to read but now is > happening even without aggregation at all. > Please let me know how can help with this improvement / bug. > > !Screen Shot 2019-05-09 at 12.30.51.png! > {code:java} > PCollection> updateColls = p.apply("Reading Ops > Collection: " + key, MongoDbIO .read() .withUri(options.getMongoDBUri()) > .withDatabase("local") .withCollection("oplog.rs") .withBucketAuto(true) // > .withQueryFn( // FindQuery.create().withFilters( // Filters.and( // > Filters.gt("ts", ts.format(dtf)), // Filters.eq("ns", > options.getMongoDBDBName() + "" + key), // Filters.eq("op", "u") // ) // ) // > // AggregationQuery.create().withMongoDbPipeline(updatedDocsOplogAggregation) > // ) ) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7256) Add support for allowDiskUse (AggregationOptions) in MongoDbIO
[ https://issues.apache.org/jira/browse/BEAM-7256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16842237#comment-16842237 ] Ismaël Mejía commented on BEAM-7256: Is explain a costly operation ? and done once per query ? because we can do it as a default if it is not. > Add support for allowDiskUse (AggregationOptions) in MongoDbIO > --- > > Key: BEAM-7256 > URL: https://issues.apache.org/jira/browse/BEAM-7256 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.12.0 >Reporter: Javier Cornejo >Priority: Major > Attachments: Screen Shot 2019-05-09 at 12.30.51.png > > > When a read is executed over a collection that exceed the memory limit of > 104857600 an exception occurs. This is declared by mongodb and is possible to > control the error passing a AggregationOptions allowDiskUse true so mongo can > sort with disk usage. > This should be happen only when aggregations are added to read but now is > happening even without aggregation at all. > Please let me know how can help with this improvement / bug. > > !Screen Shot 2019-05-09 at 12.30.51.png! > {code:java} > PCollection> updateColls = p.apply("Reading Ops > Collection: " + key, MongoDbIO .read() .withUri(options.getMongoDBUri()) > .withDatabase("local") .withCollection("oplog.rs") .withBucketAuto(true) // > .withQueryFn( // FindQuery.create().withFilters( // Filters.and( // > Filters.gt("ts", ts.format(dtf)), // Filters.eq("ns", > options.getMongoDBDBName() + "" + key), // Filters.eq("op", "u") // ) // ) // > // AggregationQuery.create().withMongoDbPipeline(updatedDocsOplogAggregation) > // ) ) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7256) Add support for allowDiskUse (AggregationOptions) in MongoDbIO
[ https://issues.apache.org/jira/browse/BEAM-7256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16842167#comment-16842167 ] Ahmed El.Hussaini commented on BEAM-7256: - [~jcornejo] since you're using "withBucketAuto(true)" internally the IO will use aggregation which will, in turn, result in an exception if the $sort stage exceeds 100MB. I see that you're trying to tail the oplog here which will be a huge collection in production so I suggest you use filters to only include the oplog for a specific collection. One thing that comes to mind now that would be helpful to debug such cases in the future is adding to the IO the ability to explain the query or aggregate. Example {code:java} p.apply("ReadFromMongoDB", MongoDbIO.read() .withUri(options.getMongoDBUri()) .withDatabase("local") .withCollection("oplog.rs") .withBucketAuto(true) .withExplain(true);{code} This will then log the explain output to the standard output if running locally (i.e. options.getMongoDBUri() returns localhost ) [~jcornejo] On a side note, I'm not sure using beam to tail the oplog would be the right choice. You need the ability to use the cursor option CursorType.TailableAwait which is currently not supported by MongoDBIO. You might also need to replay the log, which again is not supported. What I suggest you do here is the following: * Tail the log using vanilla Java code. * Publish batch messages of oplogs to Kafka or preferably Pubsub * Consume the messages using a streaming pipeline running on Dataflow or Flink. Note: If you're not using Atlas, then you can fix this issue by simply omitting withBucketAuto(true). > Add support for allowDiskUse (AggregationOptions) in MongoDbIO > --- > > Key: BEAM-7256 > URL: https://issues.apache.org/jira/browse/BEAM-7256 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.12.0 >Reporter: Javier Cornejo >Priority: Major > Attachments: Screen Shot 2019-05-09 at 12.30.51.png > > > When a read is executed over a collection that exceed the memory limit of > 104857600 an exception occurs. This is declared by mongodb and is possible to > control the error passing a AggregationOptions allowDiskUse true so mongo can > sort with disk usage. > This should be happen only when aggregations are added to read but now is > happening even without aggregation at all. > Please let me know how can help with this improvement / bug. > > !Screen Shot 2019-05-09 at 12.30.51.png! > {code:java} > PCollection> updateColls = p.apply("Reading Ops > Collection: " + key, MongoDbIO .read() .withUri(options.getMongoDBUri()) > .withDatabase("local") .withCollection("oplog.rs") .withBucketAuto(true) // > .withQueryFn( // FindQuery.create().withFilters( // Filters.and( // > Filters.gt("ts", ts.format(dtf)), // Filters.eq("ns", > options.getMongoDBDBName() + "" + key), // Filters.eq("op", "u") // ) // ) // > // AggregationQuery.create().withMongoDbPipeline(updatedDocsOplogAggregation) > // ) ) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7256) Add support for allowDiskUse (AggregationOptions) in MongoDbIO
[ https://issues.apache.org/jira/browse/BEAM-7256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16842153#comment-16842153 ] Ismaël Mejía commented on BEAM-7256: Side not, looking at the [REST API|https://docs.mongodb.com/manual/reference/command/aggregate/] doc the method exists, it is just that I don't find how to use it with our current use of the MongoDB APIs. > Add support for allowDiskUse (AggregationOptions) in MongoDbIO > --- > > Key: BEAM-7256 > URL: https://issues.apache.org/jira/browse/BEAM-7256 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.12.0 >Reporter: Javier Cornejo >Priority: Major > Attachments: Screen Shot 2019-05-09 at 12.30.51.png > > > When a read is executed over a collection that exceed the memory limit of > 104857600 an exception occurs. This is declared by mongodb and is possible to > control the error passing a AggregationOptions allowDiskUse true so mongo can > sort with disk usage. > This should be happen only when aggregations are added to read but now is > happening even without aggregation at all. > Please let me know how can help with this improvement / bug. > > !Screen Shot 2019-05-09 at 12.30.51.png! > {code:java} > PCollection> updateColls = p.apply("Reading Ops > Collection: " + key, MongoDbIO .read() .withUri(options.getMongoDBUri()) > .withDatabase("local") .withCollection("oplog.rs") .withBucketAuto(true) // > .withQueryFn( // FindQuery.create().withFilters( // Filters.and( // > Filters.gt("ts", ts.format(dtf)), // Filters.eq("ns", > options.getMongoDBDBName() + "" + key), // Filters.eq("op", "u") // ) // ) // > // AggregationQuery.create().withMongoDbPipeline(updatedDocsOplogAggregation) > // ) ) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7256) Add support for allowDiskUse (AggregationOptions) in MongoDbIO
[ https://issues.apache.org/jira/browse/BEAM-7256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16842152#comment-16842152 ] Ismaël Mejía commented on BEAM-7256: The method that allows to pass the options seems to be available only for the other Java API (the one with `DBCollection` and `DBObject`) but the IO uses the `MongoCollection` one. Not sure if there is a way to pass them, otherwise should be a doable fix. Any idea maybe [~sandboxws]? > Add support for allowDiskUse (AggregationOptions) in MongoDbIO > --- > > Key: BEAM-7256 > URL: https://issues.apache.org/jira/browse/BEAM-7256 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.12.0 >Reporter: Javier Cornejo >Priority: Major > Attachments: Screen Shot 2019-05-09 at 12.30.51.png > > > When a read is executed over a collection that exceed the memory limit of > 104857600 an exception occurs. This is declared by mongodb and is possible to > control the error passing a AggregationOptions allowDiskUse true so mongo can > sort with disk usage. > This should be happen only when aggregations are added to read but now is > happening even without aggregation at all. > Please let me know how can help with this improvement / bug. > > !Screen Shot 2019-05-09 at 12.30.51.png! > {code:java} > PCollection> updateColls = p.apply("Reading Ops > Collection: " + key, MongoDbIO .read() .withUri(options.getMongoDBUri()) > .withDatabase("local") .withCollection("oplog.rs") .withBucketAuto(true) // > .withQueryFn( // FindQuery.create().withFilters( // Filters.and( // > Filters.gt("ts", ts.format(dtf)), // Filters.eq("ns", > options.getMongoDBDBName() + "" + key), // Filters.eq("op", "u") // ) // ) // > // AggregationQuery.create().withMongoDbPipeline(updatedDocsOplogAggregation) > // ) ) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)