    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        final FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        final ComponentLog logger = getLogger();

        final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
        final String mode = context.getProperty(MODE).getValue();
        final String updateMode = context.getProperty(UPDATE_MODE).getValue();
        final WriteConcern writeConcern = getWriteConcern(context);

        final MongoCollection<Document> collection = getCollection(context, flowFile).withWriteConcern(writeConcern);

        try {
            // Read the contents of the FlowFile into a byte array
            final byte[] content = new byte[(int) flowFile.getSize()];
            session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, true));

            // parse
            final Object doc = (mode.equals(MODE_INSERT) || (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
                    ? Document.parse(new String(content, charset)) : JSON.parse(new String(content, charset));

            if (MODE_INSERT.equalsIgnoreCase(mode)) {
                collection.insertOne((Document)doc);
                logger.info("inserted {} into MongoDB", new Object[] { flowFile });
            } else {
                // update
                final boolean upsert = context.getProperty(UPSERT).asBoolean();
                final String updateKey = context.getProperty(UPDATE_QUERY_KEY).evaluateAttributeExpressions(flowFile).getValue();
                final String filterQuery = context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue();
                final Document query;

                if (!StringUtils.isBlank(updateKey)) {
                    query = parseUpdateKey(updateKey, (Map)doc, updateMode.equals(UPDATE_WITH_DOC.getValue()));
                    //removeUpdateKeys(updateKey, (Map)doc);
                    
                    
                } else {
                    query = Document.parse(filterQuery);
                }

                if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
                    collection.replaceOne(query, (Document)doc, new UpdateOptions().upsert(upsert));
                } else {
                    BasicDBObject update = (BasicDBObject)doc;
//                    update.remove(updateKey);
                    if (!StringUtils.isBlank(updateKey)) {
                        removeUpdateKeys(updateKey, update);
                    }
                    collection.updateOne(query, update, new UpdateOptions().upsert(upsert));
                }
                logger.info("updated {} into MongoDB", new Object[] { flowFile });
            }

            session.getProvenanceReporter().send(flowFile, getURI(context));
            session.transfer(flowFile, REL_SUCCESS);
        } catch (Exception e) {
            logger.error("Failed to insert {} into MongoDB due to {}", new Object[] {flowFile, e}, e);
            session.transfer(flowFile, REL_FAILURE);
            context.yield();
        }
    }

    private void removeUpdateKeys(String updateKeyParam, Map doc) {
        String[] parts = updateKeyParam.split(",[\\s]*");
        for (String part : parts) {
            if (part.contains(".")) {
                doc.remove(part);
            }
        }
    }

    private void removeUpdateKeys(String updateKeyParam, BasicDBObject doc) {
        String[] parts = updateKeyParam.split(",[\\s]*");
        for (String part : parts) {
//            if (part.contains(".")) {
                doc.remove(part);
//            }
        }
    }
    
    private Document parseUpdateKey(String updateKey, Map doc, boolean wholeDoc) {
        Document retVal;
        if (updateKey.equals("_id") && ObjectId.isValid(((String) doc.get(updateKey)))) {
            retVal = new Document("_id", new ObjectId((String) doc.get(updateKey)));
        } else if (updateKey.contains(",")) {
            String[] parts = updateKey.split(",[\\s]*");
            retVal = new Document();
            for (String part : parts) {
                if (wholeDoc) {
                    retVal.append(part,getDocument(part, (Document)doc));
                } else {
                    retVal.append(part, doc.get(part));
                }
            }
        } else {
            if (wholeDoc) {
                retVal=new Document(updateKey, getDocument(updateKey, (Document)doc));
            } else {
                retVal = new Document(updateKey, doc.get(updateKey));
            }
        }

        return retVal;
    }
    
    private Object getDocument(String key, Document doc) {
        Object nestedDoc=doc;
        StringTokenizer st=new StringTokenizer(key,".");
        while (st.hasMoreElements()) {
            String k = (String) st.nextElement();
            nestedDoc=((Document)nestedDoc).get(k);
        }
        
        return nestedDoc;
    }
