I'm not sure what the right answer here is to implement something,
especially thinking about the bigger picture of the Mongo processors. What
appears to me to be a clear need, is to set specific fields, without the
document being passed in requiring specific MongoDB syntax in it. There
must be a more flexible solution possible that can both continue to use
Mongo Operators and also use NiFi Expression Language.
I could imagine a drop-down combo box in the UI that asks which Mongo
Operator you'd like to use: $set, $unset, etc., then a text field
supporting Expression Language for the JSON values.
Example of: {"id":"${id}", $set: {"field1":"anything"}, would be expressed
in the UI as:
Mode: update
Update Query: {"id":"${id}"}
Mongo Operator: $set (from DropDown)
Operator Update Values: {"field1":"anything"}
Update Mode: With operators enabled
As an aside, the team's had some discussions on whether or not the MongoURI
should be set as a sensitive value since it contains the password in there,
or break out the password to be a separate sensitive field.
Going back to Andy's suggestion, that sounds great, but it doesn't keep my
initial JSON document I need later in the flow. I could split the
FlowFiles using an empty UpdateAttribute, and then do that, although it's a
lot of hoops to jump through.
Thanks,
Ryan
On Thu, Jun 28, 2018 at 11:19 AM Mike Thomsen <[email protected]>
wrote:
> There's also a (can't remember the name) processor or two that generates
> SQL operations. Something like that for Mongo would be appropriate. The
> question is how to approach that because you might want to do multiple
> operations in the same query. Expressing that in the processor UI could
> non-intuitive. At first glance, I would suggest a many-to-one mapping with
> dynamic properties. Something like this:
>
> customID => $set
> logins => $inc
> bad_logins => $inc
> some_field => $unset
>
> We couldn't support EL there because it would break $set because in this
> case customID should mean "fetch attribute customID and $set its value" so
> ${customID} just drops the value into the field and sets null(?) as the
> value.
>
> Doing a JSON builder for the body is probably not necessary just due to
> the fact that we already have good ways to read the JSON and do it manually
> (Jolt and ExecuteScript as examples).
>
> Thoughts?
>
> On Thu, Jun 28, 2018 at 10:17 AM Otto Fowler <[email protected]>
> wrote:
>
>> So you want to set state not data.
>>
>>
>> On June 27, 2018 at 23:32:48, Ryan Hendrickson (
>> [email protected]) wrote:
>>
>> What we've got is a bunch of custom NiFi processors that are processing
>> data. As the data is processed, we currently use Spring to load up a DAO
>> to access Mongo and annotate the file is complete. Then we send the
>> content to ElasticSearch. As it turns out, we can simplify our process
>> quite a bit by using standard processors instead of our custom ones, so
>> we're trying to pull out the Mongo updates from occurring. We've already
>> got it setup to be flow-scoped with Expression Language to define the
>> collection and query id, but not the actual update itself. We want to be
>> able to dynamically set different fields on the update based on NiFi
>> attributes, like ${progress}, ${priority}, etc.
>>
>> The issue I'm having isn't that I need to extract JSON from the FlowFile
>> Content into an attribute, it's that the FlowFile Attribute values need to
>> be stored in Mongo - which in the current implementation, updates to Mongo
>> appear to only be possible from the FlowFile Content read in.
>>
>>
>> On Wed, Jun 27, 2018 at 11:10 PM Andy LoPresto <[email protected]>
>> wrote:
>>
>>> Ryan,
>>>
>>> I believe what you are describing as the current behavior is accurate. I
>>> don’t quite follow your request to allow a property on the processor to
>>> define the content that gets used in the query, as that seems like it would
>>> be fairly static. However, I am not in the key demographic for the Mongo
>>> processors (as in, I only stand it up to review someone’s PR). In general,
>>> NiFi processor properties are designed to be “flow-scoped” rather than
>>> “single iteration of an operation-scoped” — i.e. flowfiles are flowing
>>> through processors and each respective operation is performed given the
>>> context at the time, rather than a job scheduling or “one-time activity”
>>> tool. Maybe that’s where the disconnect is if your request is more along
>>> those lines.
>>>
>>> The Update Query property does support NiFi Expression Language though,
>>> so you could set that property value to be “${update_query}” and ensure
>>> that the update_query attribute is set on incoming flowfiles. For each
>>> flowfile, the operation would occur with that dynamic query. You could use
>>> the EvaluateJsonPath processor preceding this to extract JSON components
>>> from flowfile content into an attribute.
>>>
>>> If you need an immediate fix, you can use the GenerateFlowFile processor
>>> to generate a flowfile which has the static content you’re looking for, and
>>> pass that to the PutMongo processor.
>>>
>>>
>>> Andy LoPresto
>>> [email protected]
>>> *[email protected] <[email protected]>*
>>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69
>>>
>>> On Jun 27, 2018, at 7:45 PM, Ryan Hendrickson <
>>> [email protected]> wrote:
>>>
>>> I think something must be getting lost in my email. The $set operator
>>> does work. I tested it today, however, the NiFi processor requires that
>>> input to be passed in as FlowFile. I'm recommending is that instead of
>>> using the FlowFile Content as the doc that updates the db, alternatively,
>>> it could be a NiFi Property that's set.
>>>
>>> Check out the following key lines..
>>>
>>> https://github.com/apache/nifi/blob/rel/nifi-1.6.0/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
>>>
>>> Line 197 session.read(flowFile, in ->
>>> StreamUtils.fillBuffer(in, content, true));
>>> Line 200 final Object doc = (mode.equals(MODE_INSERT) ||
>>> (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
>>> Line 201 ? Document.parse(new String(content,
>>> charset)) : JSON.parse(new String(content, charset));
>>> Line 223 BasicDBObject update = (BasicDBObject)doc;
>>> Line 225 collection.updateOne(query, update, new
>>> UpdateOptions().upsert(upsert));
>>>
>>> So on Line 223, if I'm reading this right, instead of using the doc,
>>> which is the FlowFile content, just grab the update portion of the syntax,
>>> expressed as a NiFi Property UPDATE ( {"$set": {"status":"Stage_2"}} ),
>>> such as: context.getProperty(UPDATE).getValue(), and cast that to
>>> BasicDBObject, instead of doc.
>>>
>>> Does that describe a little better what I mean?
>>>
>>>
>>> On Wed, Jun 27, 2018 at 7:37 PM Mike Thomsen <[email protected]>
>>> wrote:
>>>
>>>> Ryan,
>>>>
>>>> FWIW, Mongo's own Java client doesn't work like that. See the update
>>>> methods it exposes here:
>>>>
>>>>
>>>> http://api.mongodb.com/java/current/com/mongodb/client/MongoCollection.html#findOneAndUpdate-org.bson.conversions.Bson-org.bson.conversions.Bson-
>>>>
>>>> The single parameter update() method doesn't exist on MongoCollection.
>>>>
>>>> It can be pretty tricky going back and forth between the Mongo shell
>>>> and the client APIs.
>>>>
>>>> On Wed, Jun 27, 2018 at 5:31 PM Ryan Hendrickson <
>>>> [email protected]> wrote:
>>>>
>>>>> Mike, Matt,
>>>>> Thanks for the help. Mike - the one json thing was a typo.
>>>>>
>>>>> I just got this working as the following:
>>>>>
>>>>> File on disk, testQuery.json: {"$set":{"status":"Stage_2"}}
>>>>> GetFile ----FlowFile---> PutMongo
>>>>>
>>>>> NiFi Processor Properties:
>>>>> Mode: update
>>>>> Upsert: false
>>>>> Update Query Key: No value set
>>>>> Update Query: {"customId":"abc"}
>>>>> Update Mode: With operators enabled
>>>>>
>>>>> This feels like a pretty contrived example here that got it
>>>>> working. I'm not aware of any of the current processors that output with
>>>>> the Mongo Operators in a JSON file, which would mean if I'm operating on
>>>>> straight JSON, I'd have to manipulate it in some way to get that in there.
>>>>> That seems fairly complicated. Is there a simpler way to do this and
>>>>> I'm missing it?
>>>>>
>>>>> Just some background --
>>>>> What I'm trying to do:
>>>>> I have the ID of a record in the database, and I only want to
>>>>> update 1 field. I could take the ID and do a GetMongo -> JoltTransform ->
>>>>> PutMongo and replace the entire document, but that seems like a lot of
>>>>> processors for this use-case.
>>>>>
>>>>> My initial approach/recommendation:
>>>>> I initially thought the Update Query property would take what
>>>>> Mongo's CLI is expecting {"customId":{"$customId"},{"$set":
>>>>> {"status":"Stage_2"}}. Instead, it works by splitting on the comma - (1)
>>>>> Query as Property, (2) Update with Operator as FlowFile. I think
>>>>> supporting an update using operators that's set within the NiFi Processor
>>>>> Properties, vs in the incoming FlowFile could make this processor a lot
>>>>> more flexible.
>>>>>
>>>>> Thanks,
>>>>> Ryan
>>>>>
>>>>> On Wed, Jun 27, 2018 at 4:26 PM Matt Burgess <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> If "customId" is a flow file attribute (and the property supports
>>>>>> expression language), then you just have your braces and $ swapped,
>>>>>> try ${customId} instead of {$customId}
>>>>>> On Wed, Jun 27, 2018 at 4:15 PM Mike Thomsen <[email protected]>
>>>>>> wrote:
>>>>>> >
>>>>>> > > can you clarify if you mean the NiFi Processor Property "Update
>>>>>> Query", or the FlowFile require proper json?
>>>>>> >
>>>>>> > Both.
>>>>>> >
>>>>>> > > I'm getting an error: MongoDB due to redstartDocument can only
>>>>>> be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is
>>>>>> Array.
>>>>>> >
>>>>>> > PutMongo does not support arrays.
>>>>>> >
>>>>>> > > I'm trying to do the latter, "specify a document that contains
>>>>>> update operators"... On the mongo command line, the update would be:
>>>>>> > > db.collection.update({"customId":{$customId},{$set:
>>>>>> {"status":"Stage_2"}});
>>>>>> >
>>>>>> > I don' t know if that's a typo, but it should be
>>>>>> {"customId":{$customId}} followed by {$set: {"status":"Stage_2"}}. Your
>>>>>> version was one big document. What we would expect is a query like {
>>>>>> "customId": "XYZ"}. As mentioned, $customId is not a valid JSON value.
>>>>>> >
>>>>>> > Let me know if that helps.
>>>>>> >
>>>>>> > Mike
>>>>>> >
>>>>>> > On Wed, Jun 27, 2018 at 3:15 PM Ryan Hendrickson <
>>>>>> [email protected]> wrote:
>>>>>> >>
>>>>>> >> Hi Mike,
>>>>>> >> Just curious - any other suggestions?
>>>>>> >>
>>>>>> >> Thanks,
>>>>>> >> Ryan
>>>>>> >>
>>>>>> >> On Thu, Jun 21, 2018 at 5:23 PM Ryan Hendrickson <
>>>>>> [email protected]> wrote:
>>>>>> >>>
>>>>>> >>> Thanks for the suggestions, can you clarify if you mean the NiFi
>>>>>> Processor Property "Update Query", or the FlowFile require proper json?
>>>>>> I'm not sure how to get proper json with the $set in there.
>>>>>> >>>
>>>>>> >>> I made the following modifications based it:
>>>>>> >>>
>>>>>> >>> NiFi Processors Properties:
>>>>>> >>> Update Query: [{"customId":{$customId}},{"$set":
>>>>>> {"status":"Stage_2"}}]
>>>>>> >>> Update Mode: With operators enabled --- Confirmed that I've
>>>>>> been using this.
>>>>>> >>>
>>>>>> >>> FlowFile Contents: [{"customId":{$customId}},{"$set":
>>>>>> {"status":"Stage_2"}}]
>>>>>> >>>
>>>>>> >>> I'm getting an error: MongoDB due to redstartDocument can only
>>>>>> be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is
>>>>>> Array.
>>>>>> >>>
>>>>>> >>> The NiFi Docs for Put Mongo say [1]:
>>>>>> >>> Update Query: Specify a full MongoDB query to be used for the
>>>>>> lookup query to do an update/upsert. Supports Expression Language: true
>>>>>> >>> Update Mode: Choose an update mode. You can either supply a JSON
>>>>>> document to use as a direct replacement or specify a document that
>>>>>> contains
>>>>>> update operators like $set and $unset.
>>>>>> >>>
>>>>>> >>> I'm trying to do the latter, "specify a document that contains
>>>>>> update operators"... On the mongo command line, the update would be:
>>>>>> >>> db.collection.update({"customId":{$customId},{$set:
>>>>>> {"status":"Stage_2"}});
>>>>>> >>>
>>>>>> >>> In the NiFi flow, all I have is the customId, and I want to set a
>>>>>> status in the database when I receive it, but the database has a larger
>>>>>> set
>>>>>> of doc keys/values. I know I could do GetMongo -> JoltTransform for
>>>>>> status
>>>>>> -> PutMongo, but it seems silly to use 3 processors when this PutMongo
>>>>>> looks like it can do it...
>>>>>> >>>
>>>>>> >>> Thanks,
>>>>>> >>> Ryan
>>>>>> >>>
>>>>>> >>> [1]
>>>>>> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-mongodb-nar/1.6.0/org.apache.nifi.processors.mongodb.PutMongo/index.html
>>>>>> >>>
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> On Thu, Jun 21, 2018 at 4:57 PM Mike Thomsen <
>>>>>> [email protected]> wrote:
>>>>>> >>>>
>>>>>> >>>> Two things:
>>>>>> >>>>
>>>>>> >>>> 1. You need to use valid JSON. Your query is not a valid JSON
>>>>>> example because some of the values are not quoted.
>>>>>> >>>> 2. You need to make sure the update option is set to use
>>>>>> operators, not use document.
>>>>>> >>>>
>>>>>> >>>> Let us know if that helps.
>>>>>> >>>>
>>>>>> >>>> Mike
>>>>>> >>>>
>>>>>> >>>> On Thu, Jun 21, 2018 at 3:19 PM Ryan Hendrickson <
>>>>>> [email protected]> wrote:
>>>>>> >>>>>
>>>>>> >>>>> Hi,
>>>>>> >>>>> I can't seem to figure out the right combo of parameters to
>>>>>> get a document to update in Mongo using the PutMongo processor and the
>>>>>> $set
>>>>>> operator.
>>>>>> >>>>>
>>>>>> >>>>> Try 1:
>>>>>> >>>>> The incoming flowfile contains the customId: abc
>>>>>> >>>>>
>>>>>> >>>>> NiFi Processor Properties:
>>>>>> >>>>> Mode: update
>>>>>> >>>>> Upsert: false
>>>>>> >>>>> Update Query Key: No value set
>>>>>> >>>>> Update Query: {"customId":{$customId}},{$set:
>>>>>> {"status":"Stage_2"}}
>>>>>> >>>>> Update Mode: With operators enabled
>>>>>> >>>>>
>>>>>> >>>>> This consistently fails, the abbreviated log output:
>>>>>> >>>>> PutMongo Failed to insert into MongoDB due to
>>>>>> com.mongodb.util.JSONParseException:
>>>>>> >>>>> abc
>>>>>> >>>>> ...
>>>>>> >>>>> at com.mongodb.util.JSONParser.parse(JSON.java:230)
>>>>>> >>>>> ...
>>>>>> >>>>> at
>>>>>> org.apache.nifi.processors.mongodb.PutMongo.onTrigger(PutMongo.java:201)
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> https://github.com/apache/nifi/blob/rel/nifi-1.6.0/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java#L201
>>>>>> >>>>>
>>>>>> >>>>> It looks like it's trying to parse the incoming flowfile as a
>>>>>> JSON document with the above parameters set.
>>>>>> >>>>>
>>>>>> >>>>> Try 2:
>>>>>> >>>>> With that in mind, I changed my input flowfile to be a json
>>>>>> object, but I don't think it should need to be, because I'm using the
>>>>>> Update Query with Operators.
>>>>>> >>>>> New incoming flow file: {"customId":"abc"}
>>>>>> >>>>>
>>>>>> >>>>> This allows it to get line 225, before it fails with:
>>>>>> >>>>> PutMongo Failed to insert into MongoDB due to
>>>>>> java.lang.IllegalArgumentException: Invalid BSON field name customId:
>>>>>> >>>>> at
>>>>>> org.bson.AbstractBsonWriter.writeName(AbstractBsonWriter.java:494)
>>>>>> >>>>> at
>>>>>> org.apache.nifi.processors.mongodb.PutMongo.onTrigger(PutMongo:225)
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> https://github.com/apache/nifi/blob/rel/nifi-1.6.0/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java#L225
>>>>>> >>>>>
>>>>>> >>>>> Line 225: //collection.updateOne(query, update, new
>>>>>> UpdateOptions().upsert(upsert));
>>>>>> >>>>> query = {"customId":{$customId}},{$set: {"status":"Stage_2"}}
>>>>>> >>>>> update = {"customId":"abc"}
>>>>>> >>>>> It looks like the 'update' variable, is my incoming flowfile.
>>>>>> I'm not sure why it would be, based on my understanding of the processor
>>>>>> properties works.
>>>>>> >>>>>
>>>>>> >>>>> If anyone has any insight on how to set this up for using the
>>>>>> operators to update a document, I'd really appreciate the insight. I'm
>>>>>> lost in debugging.
>>>>>> >>>>>
>>>>>> >>>>> Thanks,
>>>>>> >>>>> Best,
>>>>>> >>>>> Ryan
>>>>>> >>>>>
>>>>>>
>>>>>
>>>