[jira] [Commented] (NIFI-7148) Invalid config event received - zookeeper
[ https://issues.apache.org/jira/browse/NIFI-7148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099529#comment-17099529 ] Joshua Villanueva commented on NIFI-7148: - Hi [~danoyoung], what specific properties were you referring to? I'm having the same issue with no other leads so far. Two nodes are connected to the nifi cluster while the other one can't due to this error. > Invalid config event received - zookeeper > -- > > Key: NIFI-7148 > URL: https://issues.apache.org/jira/browse/NIFI-7148 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.11.1 >Reporter: dan young >Priority: Minor > > We're seeing the following messages in nifi logs on our cluster nodes. We > have 3 nodes running Nifi 1.11.1 and zookeeper (not embedded) version 3.5.6 > Functionality seems not to be impacted, but wondering if there's something > else > going on or the version of zookeeper we're using is causing this. > 2020-02-12 15:36:43,959 ERROR [main-EventThread] > o.a.c.framework.imps.EnsembleTracker Invalid config event received: > {server.1=10.190.3.170:2888:3888:participant, version=0, > server.3=10.190.3.91:2888:3888:participant, > server.2=10.190.3.172:2888:3888:participant} > Regards, > Dano -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] alopresto commented on a change in pull request #4253: NIFI-7406: PutAzureCosmosRecord Processor
alopresto commented on a change in pull request #4253: URL: https://github.com/apache/nifi/pull/4253#discussion_r419785768 ## File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AbstractAzureCosmosDBProcessor.java ## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.azure.cosmos.document; + +import java.io.ByteArrayInputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import com.azure.cosmos.ConnectionPolicy; +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.CosmosClient; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosClientException; +import com.azure.cosmos.CosmosContainer; +import com.azure.cosmos.CosmosDatabase; +import com.azure.cosmos.models.CosmosContainerProperties; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.services.azure.cosmos.AzureCosmosDBConnectionService; + +public abstract class AbstractAzureCosmosDBProcessor extends AbstractProcessor { +static final String CONSISTENCY_STRONG = "STRONG"; +static final String CONSISTENCY_BOUNDED_STALENESS= "BOUNDED_STALENESS"; +static final String CONSISTENCY_SESSION = "SESSION"; +static final String CONSISTENCY_CONSISTENT_PREFIX = "CONSISTENT_PREFIX"; +static final String CONSISTENCY_EVENTUAL = "EVENTUAL"; + +static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are written to Azure Cosmos DB are routed to this relationship").build(); + +static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be written to Azure Cosmos DB are routed to this relationship").build(); + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.name("original") +.description("All input FlowFiles that are part of a successful query execution go here.") +.build(); + +static final PropertyDescriptor CONNECTION_SERVICE = new PropertyDescriptor.Builder() +.name("azure-cosmos-db-connection-service") Review comment: Thanks for including clear `name` and `displayName` values for each of these property descriptors. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] alopresto commented on pull request #4253: NIFI-7406: PutAzureCosmosRecord Processor
alopresto commented on pull request #4253: URL: https://github.com/apache/nifi/pull/4253#issuecomment-623759235 I updated the PR title as I believe it was linked to the wrong Jira (NIFI-7407 is not related to this work). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] sjyang18 opened a new pull request #4253: NIFI-7407: PutAzureCosmosRecord Processor
sjyang18 opened a new pull request #4253: URL: https://github.com/apache/nifi/pull/4253 Thank you for submitting a contribution to Apache NiFi. Please provide a short description of the PR here: Description of PR Functionally it is equivalent to PutMongoRecord Processor in nifi-mongodb-bundle, but this processor will use the cosmos native sql API. It is record-oriented Put Processor that uses the cosmos sql api. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [X] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [X] Does your PR title start with **NIFI-** where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [X] Has your PR been rebased against the latest commit within the target branch (typically `master`)? - [X] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._ ### For code changes: - [X] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder? - [X] Have you written or updated unit tests to verify your changes? - [X] Have you verified that the full build is successful on both JDK 8 and JDK 11? - [X] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`? - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`? - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #775: MINIFICPP-965 Add other supported relationships to InvokeHTTP processor.
msharee9 commented on a change in pull request #775: URL: https://github.com/apache/nifi-minifi-cpp/pull/775#discussion_r419750421 ## File path: libminifi/test/resources/TestInvokeHTTPPost.yml ## @@ -0,0 +1,161 @@ +MiNiFi Config Version: 3 +Flow Controller: + name: c++lw + comment: Created by MiNiFi C2 Flow Designer +Core Properties: + flow controller graceful shutdown period: 10 sec + flow service write delay interval: 500 ms + administrative yield duration: 30 sec + bored yield duration: 10 millis + max concurrent threads: 1 + variable registry properties: '' +FlowFile Repository: + partitions: 256 + checkpoint interval: 2 mins + always sync: false + Swap: +threshold: 2 +in period: 5 sec +in threads: 1 +out period: 5 sec +out threads: 4 +Content Repository: + content claim max appendable size: 10 MB + content claim max flow files: 100 + always sync: false +Provenance Repository: + provenance rollover time: 1 min + implementation: org.apache.nifi.provenance.MiNiFiPersistentProvenanceRepository +Component Status Repository: + buffer size: 1440 + snapshot frequency: 1 min +Security Properties: + keystore: '' + keystore type: '' + keystore password: '' + key password: '' + truststore: '' + truststore type: '' + truststore password: '' + ssl protocol: '' + Sensitive Props: +key: +algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL +provider: BC +Processors: +- id: 4ed2d51d-076a-49b0-88de-5cf5adf52a7e + name: GenerateFlowFile + class: org.apache.nifi.minifi.processors.GenerateFlowFile + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 15000 ms + penalization period: 1000 ms + yield period: 1000 ms + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: +Batch Size: '1' +Data Format: Binary +File Size: 1 kB +Unique FlowFiles: 'true' +- id: 1d51724d-dd76-46a0-892d-a7c7408d58dd + name: InvokeHTTP + class: org.apache.nifi.minifi.processors.InvokeHTTP + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 1000 ms + penalization period: 1000 ms + yield period: 1000 ms + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: +Always Output Response: 'false' +Connection Timeout: 3 s +Content-type: application/octet-stream +Disable Peer Verification: 'false' +HTTP Method: POST +Include Date Header: 'true' +Read Timeout: 4 s +Remote URL: http://localhost:10033/minifi Review comment: Agree with you on this. Changed the behavior to use generated port numbers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #775: MINIFICPP-965 Add other supported relationships to InvokeHTTP processor.
msharee9 commented on a change in pull request #775: URL: https://github.com/apache/nifi-minifi-cpp/pull/775#discussion_r419743218 ## File path: extensions/http-curl/client/HTTPClient.cpp ## @@ -296,12 +296,13 @@ bool HTTPClient::submit() { } curl_easy_setopt(http_session_, CURLOPT_HEADERFUNCTION, ::HTTPHeaderResponse::receive_headers); curl_easy_setopt(http_session_, CURLOPT_HEADERDATA, static_cast(_response_)); - if (keep_alive_probe_ > 0){ -logger_->log_debug("Setting keep alive to %d",keep_alive_probe_); + if (keep_alive_probe_.count() > 0) { +const auto keepAlive = std::chrono::duration_cast>(keep_alive_probe_); Review comment: Agree that std::chrono::seconds is better here. However, on a side note, chrono::seconds is std::duration https://gcc.gnu.org/onlinedocs/gcc-4.6.3/libstdc++/api/a00798_source.html#l00508 It does not matter here if we use int64_t or uint64_t because we are checking for negative value. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (NIFI-7424) PutSQL - Flowfiles stuck in incoming queue due to java.sql.SQLException: Can't call commit when autocommit=true
Ashish created NIFI-7424: Summary: PutSQL - Flowfiles stuck in incoming queue due to java.sql.SQLException: Can't call commit when autocommit=true Key: NIFI-7424 URL: https://issues.apache.org/jira/browse/NIFI-7424 Project: Apache NiFi Issue Type: Bug Affects Versions: 1.11.4, 1.10.0 Reporter: Ashish Attachments: Data Flow.png, Error.png, PutSQL Properties.png, nifi-app.log For standard processor PutSQL, with property "Database Session AutoCommit" set to TRUE, it does not allow the incoming flowfile to move to any of the relationships (SUCCESS, FAILURE, RETRY), since it throws the following error - PutSQL[id=] Failed to process session due to Failed to commit database connection due to java.sql.SQLException: Can't call commit when autocommit=true: org.apache.nifi.processor.exception.ProcessException: Failed to commit database connection due to java.sql.SQLException: Can't call commit when autocommit=true "Rollback On Failure" and "Support Fragmented Transactions" are set false with "Batch Size" value to 1. Flowfile Content INSERT INTO user_nifi (id, name) VALUES (?, ?) Flowfile Attributes (relevant) sql.args.1.type 4 sql.args.1.value 25 sql.args.2.type 12 sql.args.2.value Georgie -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (NIFI-4307) Add Kotlin support to ExecuteScript
[ https://issues.apache.org/jira/browse/NIFI-4307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mike Thomsen reassigned NIFI-4307: -- Assignee: Mike Thomsen > Add Kotlin support to ExecuteScript > --- > > Key: NIFI-4307 > URL: https://issues.apache.org/jira/browse/NIFI-4307 > Project: Apache NiFi > Issue Type: Improvement >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Minor > > Kotlin has a ScriptEngine implementation as of v1.1. Add support for it in > NiFi. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (NIFI-4307) Add Kotlin support to ExecuteScript
[ https://issues.apache.org/jira/browse/NIFI-4307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099274#comment-17099274 ] Mike Thomsen commented on NIFI-4307: Going to revisit this after working on #5422 > Add Kotlin support to ExecuteScript > --- > > Key: NIFI-4307 > URL: https://issues.apache.org/jira/browse/NIFI-4307 > Project: Apache NiFi > Issue Type: Improvement >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Minor > > Kotlin has a ScriptEngine implementation as of v1.1. Add support for it in > NiFi. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (NIFI-5422) Make ExecuteScript thread safe & use CompiledScript where possible
[ https://issues.apache.org/jira/browse/NIFI-5422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mike Thomsen reassigned NIFI-5422: -- Assignee: Mike Thomsen > Make ExecuteScript thread safe & use CompiledScript where possible > -- > > Key: NIFI-5422 > URL: https://issues.apache.org/jira/browse/NIFI-5422 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Ramon Havermans >Assignee: Mike Thomsen >Priority: Major > > In ExcuteScript Bindings are reused, which makes it non thread safe. Most > engines can use a Compiled version of the engine, which makes it faster. > Pull request will follow. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (NIFI-7414) NiFi fails to startup if it encounters some unicode characters in the flow.xml.gz
[ https://issues.apache.org/jira/browse/NIFI-7414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Gilman updated NIFI-7414: -- Fix Version/s: 1.12.0 Resolution: Fixed Status: Resolved (was: Patch Available) > NiFi fails to startup if it encounters some unicode characters in the > flow.xml.gz > - > > Key: NIFI-7414 > URL: https://issues.apache.org/jira/browse/NIFI-7414 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Reporter: Mark Payne >Assignee: Mark Payne >Priority: Major > Fix For: 1.12.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > There are some elements where NiFi does not strip out invalid XML characters > when writing out the flow.xml.gz. For example, if a variable is created and > the variable's value has a unicode 0x0001 character in it, NiFi will accept > the input but then will fail to restart with the following stack trace: > {quote} > 2020-04-30 13:06:16,834 WARN [main] org.apache.nifi.web.server.JettyServer > Failed to start web server... shutting down. > org.apache.nifi.controller.serialization.FlowSerializationException: > org.xml.sax.SAXParseException; lineNumber: 51; columnNumber: 30; Character > reference
[jira] [Commented] (NIFI-7414) NiFi fails to startup if it encounters some unicode characters in the flow.xml.gz
[ https://issues.apache.org/jira/browse/NIFI-7414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099245#comment-17099245 ] ASF subversion and git services commented on NIFI-7414: --- Commit 0448e23a963a4f34f12cfa96e13d81675eb2b33b in nifi's branch refs/heads/master from Mark Payne [ https://gitbox.apache.org/repos/asf?p=nifi.git;h=0448e23 ] NIFI-7414: Escape user-defined values that contain invalid XML characters before writing flow.xml.gz NIFI-7414: Updated StandardFlowSerializerTest to include testing for variable names and values being filtered This closes #4244 > NiFi fails to startup if it encounters some unicode characters in the > flow.xml.gz > - > > Key: NIFI-7414 > URL: https://issues.apache.org/jira/browse/NIFI-7414 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Reporter: Mark Payne >Assignee: Mark Payne >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > There are some elements where NiFi does not strip out invalid XML characters > when writing out the flow.xml.gz. For example, if a variable is created and > the variable's value has a unicode 0x0001 character in it, NiFi will accept > the input but then will fail to restart with the following stack trace: > {quote} > 2020-04-30 13:06:16,834 WARN [main] org.apache.nifi.web.server.JettyServer > Failed to start web server... shutting down. > org.apache.nifi.controller.serialization.FlowSerializationException: > org.xml.sax.SAXParseException; lineNumber: 51; columnNumber: 30; Character > reference
[GitHub] [nifi] mcgilman commented on pull request #4244: NIFI-7414: Escape user-defined values that contain invalid XML charac…
mcgilman commented on pull request #4244: URL: https://github.com/apache/nifi/pull/4244#issuecomment-623640690 Thanks for the PR @markap14! Thanks for the review @ottobackwards! This has been merged to master. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (NIFI-7414) NiFi fails to startup if it encounters some unicode characters in the flow.xml.gz
[ https://issues.apache.org/jira/browse/NIFI-7414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099246#comment-17099246 ] ASF subversion and git services commented on NIFI-7414: --- Commit 0448e23a963a4f34f12cfa96e13d81675eb2b33b in nifi's branch refs/heads/master from Mark Payne [ https://gitbox.apache.org/repos/asf?p=nifi.git;h=0448e23 ] NIFI-7414: Escape user-defined values that contain invalid XML characters before writing flow.xml.gz NIFI-7414: Updated StandardFlowSerializerTest to include testing for variable names and values being filtered This closes #4244 > NiFi fails to startup if it encounters some unicode characters in the > flow.xml.gz > - > > Key: NIFI-7414 > URL: https://issues.apache.org/jira/browse/NIFI-7414 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Reporter: Mark Payne >Assignee: Mark Payne >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > There are some elements where NiFi does not strip out invalid XML characters > when writing out the flow.xml.gz. For example, if a variable is created and > the variable's value has a unicode 0x0001 character in it, NiFi will accept > the input but then will fail to restart with the following stack trace: > {quote} > 2020-04-30 13:06:16,834 WARN [main] org.apache.nifi.web.server.JettyServer > Failed to start web server... shutting down. > org.apache.nifi.controller.serialization.FlowSerializationException: > org.xml.sax.SAXParseException; lineNumber: 51; columnNumber: 30; Character > reference
[jira] [Assigned] (NIFI-7423) Upgrade jquery version
[ https://issues.apache.org/jira/browse/NIFI-7423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] M Tien reassigned NIFI-7423: Assignee: M Tien > Upgrade jquery version > -- > > Key: NIFI-7423 > URL: https://issues.apache.org/jira/browse/NIFI-7423 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Affects Versions: 1.11.4 >Reporter: M Tien >Assignee: M Tien >Priority: Major > Labels: dependency > > Upgrade dependency version of jquery. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (NIFI-7423) Upgrade jquery version
M Tien created NIFI-7423: Summary: Upgrade jquery version Key: NIFI-7423 URL: https://issues.apache.org/jira/browse/NIFI-7423 Project: Apache NiFi Issue Type: Improvement Components: Core Framework Affects Versions: 1.11.4 Reporter: M Tien Upgrade dependency version of jquery. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (NIFI-7398) Upgrade jackson-databind version
[ https://issues.apache.org/jira/browse/NIFI-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] M Tien updated NIFI-7398: - Status: Patch Available (was: In Progress) > Upgrade jackson-databind version > > > Key: NIFI-7398 > URL: https://issues.apache.org/jira/browse/NIFI-7398 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Affects Versions: 1.11.4 >Reporter: M Tien >Assignee: M Tien >Priority: Major > Labels: dependency > Time Spent: 10m > Remaining Estimate: 0h > > Upgrade dependency version of jackson-databind. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] joewitt commented on pull request #4244: NIFI-7414: Escape user-defined values that contain invalid XML charac…
joewitt commented on pull request #4244: URL: https://github.com/apache/nifi/pull/4244#issuecomment-623599036 is worth plenty - thanks for sticking with him on this otto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] ottobackwards commented on pull request #4244: NIFI-7414: Escape user-defined values that contain invalid XML charac…
ottobackwards commented on pull request #4244: URL: https://github.com/apache/nifi/pull/4244#issuecomment-623598263 +1 fwiw pending ci-workflow completion This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] markap14 commented on pull request #4244: NIFI-7414: Escape user-defined values that contain invalid XML charac…
markap14 commented on pull request #4244: URL: https://github.com/apache/nifi/pull/4244#issuecomment-623595257 Yeah, the Strings in the unit tests were definitely wrong. But for some reason the tests were passing in my IntelliJ. But after I did a full re-build, they started failing, as they should. Fixed now and pushed the update. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (MINIFICPP-1214) Convert H2O Processors to use ALv2 compliant H20-3 library
[ https://issues.apache.org/jira/browse/MINIFICPP-1214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099149#comment-17099149 ] Joe Witt commented on MINIFICPP-1214: - thanks > Convert H2O Processors to use ALv2 compliant H20-3 library > -- > > Key: MINIFICPP-1214 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1214 > Project: Apache NiFi MiNiFi C++ > Issue Type: Bug >Reporter: Marc Parisi >Assignee: James Medel >Priority: Blocker > Fix For: 0.8.0 > > > Per https://issues.apache.org/jira/browse/MINIFICPP-1201 We should resolve > Joe's licensing concerns so we're on the up and up. We can either revert or > mitigate this via using H20-3 (https://github.com/h2oai/h2o-3) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (MINIFICPP-1214) Convert H2O Processors to use ALv2 compliant H20-3 library
[ https://issues.apache.org/jira/browse/MINIFICPP-1214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099148#comment-17099148 ] Marc Parisi edited comment on MINIFICPP-1214 at 5/4/20, 5:17 PM: - [~joewitt] FYI in case you aren't monitoring tickets. I've made this a blocker as a follow on feel free to watch if you want to monitor this. I will continue to monitor this. was (Author: phrocker): [~joewitt] FYI in case you aren't monitoring tickets. I've made this a blocker as a follow on. > Convert H2O Processors to use ALv2 compliant H20-3 library > -- > > Key: MINIFICPP-1214 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1214 > Project: Apache NiFi MiNiFi C++ > Issue Type: Bug >Reporter: Marc Parisi >Assignee: James Medel >Priority: Blocker > Fix For: 0.8.0 > > > Per https://issues.apache.org/jira/browse/MINIFICPP-1201 We should resolve > Joe's licensing concerns so we're on the up and up. We can either revert or > mitigate this via using H20-3 (https://github.com/h2oai/h2o-3) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (MINIFICPP-1214) Convert H2O Processors to use ALv2 compliant H20-3 library
[ https://issues.apache.org/jira/browse/MINIFICPP-1214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099148#comment-17099148 ] Marc Parisi commented on MINIFICPP-1214: [~joewitt] FYI in case you aren't monitoring tickets. I've made this a blocker as a follow on. > Convert H2O Processors to use ALv2 compliant H20-3 library > -- > > Key: MINIFICPP-1214 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1214 > Project: Apache NiFi MiNiFi C++ > Issue Type: Bug >Reporter: Marc Parisi >Assignee: James Medel >Priority: Blocker > Fix For: 0.8.0 > > > Per https://issues.apache.org/jira/browse/MINIFICPP-1201 We should resolve > Joe's licensing concerns so we're on the up and up. We can either revert or > mitigate this via using H20-3 (https://github.com/h2oai/h2o-3) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (MINIFICPP-1214) Convert H2O Processors to use ALv2 compliant H20-3 library
Marc Parisi created MINIFICPP-1214: -- Summary: Convert H2O Processors to use ALv2 compliant H20-3 library Key: MINIFICPP-1214 URL: https://issues.apache.org/jira/browse/MINIFICPP-1214 Project: Apache NiFi MiNiFi C++ Issue Type: Bug Reporter: Marc Parisi Assignee: James Medel Fix For: 0.8.0 Per https://issues.apache.org/jira/browse/MINIFICPP-1201 We should resolve Joe's licensing concerns so we're on the up and up. We can either revert or mitigate this via using H20-3 (https://github.com/h2oai/h2o-3) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (MINIFICPP-1199) Integrates MiNiFi C++ with H2O Driverless AI Python Scoring Pipeline To Do ML Inference on Edge
[ https://issues.apache.org/jira/browse/MINIFICPP-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marc Parisi resolved MINIFICPP-1199. Resolution: Fixed [~james.medel] I've closed this (finally – turns out my previous password reset didn't work but lastpass changed it anyway!). Per [~joewitt] 's point , which I totally agree with, if you can get the H2O-3 follow ups we can augment versus revert. > Integrates MiNiFi C++ with H2O Driverless AI Python Scoring Pipeline To Do ML > Inference on Edge > --- > > Key: MINIFICPP-1199 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1199 > Project: Apache NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: master > Environment: Ubuntu 18.04 in AWS EC2 > MiNiFi C++ 0.7.0 >Reporter: James Medel >Priority: Blocker > Fix For: 0.8.0 > > > *MiNiFi C++ and H2O Driverless AI Integration* via Custom Python Processors: > Integrates MiNiFi C++ with H2O's Driverless AI by Using Driverless AI's > Python Scoring Pipeline and MiNiFi's Custom Python Processors. Uses the > Python Processors to execute the Python Scoring Pipeline scorer to do batch > scoring and real-time scoring for one or more predicted labels on test data > in the incoming flow file content. I would like to contribute my processors > to MiNiFi C++ as a new feature. > > *3 custom python processors* created for MiNiFi: > *H2oPspScoreRealTime* - Executes H2O Driverless AI's Python Scoring Pipeline > to do interactive scoring (real-time) scoring on an individual row or list of > test data within each incoming flow file. Uses H2O's open-source Datatable > library to load test data into a frame, then converts it to pandas dataframe. > Pandas is used to convert the pandas dataframe rows to a list of lists, but > since each flow file passing through this processor should have only 1 row, > we extract the 1st list. Then that list is passed into the Driverless AI's > Python scorer.score() function to predict one or more predicted labels. The > prediction is returned to a list. The number of predicted labels is specified > when the user built the Python Scoring Pipeline in Driverless AI. With that > knowledge, there is a property for the user to pass in one or more predicted > label names that will be used as the predicted header. I create a comma > separated string using the predicted header and predicted value. The > predicted header(s) is on one line followed by a newline and the predicted > value(s) is on the next line followed by a newline. The string is written to > the flow file content. Flow File attributes are added to the flow file for > the number of lists scored and the predicted label name and its associated > score. Finally, the flow file is transferred on a success relationship. > > *H2oPspScoreBatches* - Executes H2O Driverless AI's Python Scoring Pipeline > to do batch scoring on a frame of data within each incoming flow file. Uses > H2O's open-source Datatable library to load test data into a frame. Each > frame from the flow file passing through this processor should have multiple > rows. That frame is passed into the Driverless AI's Python > scorer.score_batch() function to predict one or more predicted labels. The > prediction is returned to a pandas dataframe, then that dataframe is > converted to a string, so it can be written to the flow file content. Flow > File attributes are added to the flow file for the number of rows scored. > There are also flow file attributes added for the predicted label name and > its associated score for the first row in the frame. Finally, the flow file > is transferred on a success relationship. > > *ConvertDsToCsv* - Converts data source of incoming flow file to csv. Uses > H2O's open-source Datatable library to load data into a frame, then converts > it to pandas dataframe. Pandas is used to convert the pandas dataframe to a > csv and store it into in-memory text stream StringIO without pandas dataframe > index. The csv string data is grabbed using file read() function on the > StringIO object, so it can be written to the flow file content. The flow file > is transferred on a success relationship. > > *Hydraulic System Condition Monitoring* Data used in MiNiFi Flow: > The sensor test data I used in this integration comes from [Kaggle: Condition > Monitoring of Hydraulic > Systems|[https://www.kaggle.com/jjacostupa/condition-monitoring-of-hydraulic-systems#description.txt]]. > I was able to predict hydraulic system cooling efficiency through MiNiFi and > H2O integration described above. This use case here is hydraulic system > predictive maintenance. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (MINIFICPP-1201) Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ Runtime Python Wrapper) To Do ML Inference on Edge
[ https://issues.apache.org/jira/browse/MINIFICPP-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099142#comment-17099142 ] Marc Parisi commented on MINIFICPP-1201: [~james.medel] I've closed this (finally – turns out my previous password reset didn't work but lastpass changed it anyway!). Per [~joewitt] 's point above, which I totally agree with, if you can get the H2O-3 follow ups we can augment versus revert. > Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ > Runtime Python Wrapper) To Do ML Inference on Edge > -- > > Key: MINIFICPP-1201 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1201 > Project: Apache NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: master > Environment: Ubuntu 18.04 in AWS EC2 > MiNiFi C++ 0.7.0 >Reporter: James Medel >Priority: Blocker > Fix For: 0.8.0 > > > *MiNiFi C++ and H2O Driverless AI Integration* via Custom Python Processors: > Integrates MiNiFi C++ with H2O Driverless AI by using Driverless AI's MOJO > Scoring Pipeline (in C++ Runtime Python Wrapper) and MiNiFi's Custom Python > Processor. Uses a Python Processor to execute the MOJO Scoring Pipeline to do > batch scoring or real-time scoring for one or more predicted labels on > tabular test data in the incoming flow file content. If the tabular data is > one row, then the MOJO does real-time scoring. If the tabular data is > multiple rows, then the MOJO does batch scoring. I would like to contribute > my processors to MiNiFi C++ as a new feature. > *1 custom python processor* created for MiNiFi: > *H2oMojoPwScoring* - Executes H2O Driverless AI's MOJO Scoring Pipeline in > C++ Runtime Python Wrapper to do batch scoring or real-time scoring on a > frame of data within each incoming flow file. Requires the user to add the > *pipeline.mojo* filepath into the "MOJO Pipeline Filepath" property. This > property is used in the onTrigger(context, session) function to get the > pipeline.mojo filepath, so we can *pass it into* the > *daimojo.model(pipeline_mojo_filepath)* function to instantiate our > *mojo_scorer*. MOJO creation time and uuid are added as individual flow file > attributes. Then the *flow file content* is *loaded into Datatable* *frame* > to hold the test data. Then a Python lambda function called compare is used > to compare whether the datatable frame header column names equals the > expected header column names from the mojo scorer. This check is done because > the datatable frame could have a missing header, which is true when the > header does not equal the expected header and so we update the datatable > frame header with the mojo scorer's expected header. Having the correct > header works nicely because the *mojo scorer's* *predict(datatable_frame)* > function needs the header and then does the prediction returning a > predictions datatable frame. The mojo scorer's predict function is *capable > of doing real-time scoring or batch scoring*, it just depends on the amount > of rows that the tabular data has. This predictions datatable frame is then > converted to pandas dataframe, so we can use pandas' to_string(index=False) > function to convert the dataframe to a string without the dataframe's index. > Then *the prediction string is written to flow file content*. A flow file > attribute is added for the number of rows scored. Another one or more flow > file attributes are added for the predicted label name and its associated > score. Finally, the flow file is transferred on a success relationship. > > *Hydraulic System Condition Monitoring* Data used in MiNiFi Flow: > The sensor test data I used in this integration comes from Kaggle: Condition > Monitoring of Hydraulic Systems. I was able to predict hydraulic system > cooling efficiency through MiNiFi and H2O integration described above. This > use case here is hydraulic system predictive maintenance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (MINIFICPP-1201) Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ Runtime Python Wrapper) To Do ML Inference on Edge
[ https://issues.apache.org/jira/browse/MINIFICPP-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marc Parisi resolved MINIFICPP-1201. Resolution: Fixed > Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ > Runtime Python Wrapper) To Do ML Inference on Edge > -- > > Key: MINIFICPP-1201 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1201 > Project: Apache NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: master > Environment: Ubuntu 18.04 in AWS EC2 > MiNiFi C++ 0.7.0 >Reporter: James Medel >Priority: Blocker > Fix For: 0.8.0 > > > *MiNiFi C++ and H2O Driverless AI Integration* via Custom Python Processors: > Integrates MiNiFi C++ with H2O Driverless AI by using Driverless AI's MOJO > Scoring Pipeline (in C++ Runtime Python Wrapper) and MiNiFi's Custom Python > Processor. Uses a Python Processor to execute the MOJO Scoring Pipeline to do > batch scoring or real-time scoring for one or more predicted labels on > tabular test data in the incoming flow file content. If the tabular data is > one row, then the MOJO does real-time scoring. If the tabular data is > multiple rows, then the MOJO does batch scoring. I would like to contribute > my processors to MiNiFi C++ as a new feature. > *1 custom python processor* created for MiNiFi: > *H2oMojoPwScoring* - Executes H2O Driverless AI's MOJO Scoring Pipeline in > C++ Runtime Python Wrapper to do batch scoring or real-time scoring on a > frame of data within each incoming flow file. Requires the user to add the > *pipeline.mojo* filepath into the "MOJO Pipeline Filepath" property. This > property is used in the onTrigger(context, session) function to get the > pipeline.mojo filepath, so we can *pass it into* the > *daimojo.model(pipeline_mojo_filepath)* function to instantiate our > *mojo_scorer*. MOJO creation time and uuid are added as individual flow file > attributes. Then the *flow file content* is *loaded into Datatable* *frame* > to hold the test data. Then a Python lambda function called compare is used > to compare whether the datatable frame header column names equals the > expected header column names from the mojo scorer. This check is done because > the datatable frame could have a missing header, which is true when the > header does not equal the expected header and so we update the datatable > frame header with the mojo scorer's expected header. Having the correct > header works nicely because the *mojo scorer's* *predict(datatable_frame)* > function needs the header and then does the prediction returning a > predictions datatable frame. The mojo scorer's predict function is *capable > of doing real-time scoring or batch scoring*, it just depends on the amount > of rows that the tabular data has. This predictions datatable frame is then > converted to pandas dataframe, so we can use pandas' to_string(index=False) > function to convert the dataframe to a string without the dataframe's index. > Then *the prediction string is written to flow file content*. A flow file > attribute is added for the number of rows scored. Another one or more flow > file attributes are added for the predicted label name and its associated > score. Finally, the flow file is transferred on a success relationship. > > *Hydraulic System Condition Monitoring* Data used in MiNiFi Flow: > The sensor test data I used in this integration comes from Kaggle: Condition > Monitoring of Hydraulic Systems. I was able to predict hydraulic system > cooling efficiency through MiNiFi and H2O integration described above. This > use case here is hydraulic system predictive maintenance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (MINIFICPP-1201) Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ Runtime Python Wrapper) To Do ML Inference on Edge
[ https://issues.apache.org/jira/browse/MINIFICPP-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099139#comment-17099139 ] Marc Parisi commented on MINIFICPP-1201: +1 Yeah totally. thanks for following up. I think in the e-mail chain it was becoming apparent that I wasn't correctly relaying that I think H2O-3 needed to be introduced as a corrective action. > Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ > Runtime Python Wrapper) To Do ML Inference on Edge > -- > > Key: MINIFICPP-1201 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1201 > Project: Apache NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: master > Environment: Ubuntu 18.04 in AWS EC2 > MiNiFi C++ 0.7.0 >Reporter: James Medel >Priority: Blocker > Fix For: 0.8.0 > > > *MiNiFi C++ and H2O Driverless AI Integration* via Custom Python Processors: > Integrates MiNiFi C++ with H2O Driverless AI by using Driverless AI's MOJO > Scoring Pipeline (in C++ Runtime Python Wrapper) and MiNiFi's Custom Python > Processor. Uses a Python Processor to execute the MOJO Scoring Pipeline to do > batch scoring or real-time scoring for one or more predicted labels on > tabular test data in the incoming flow file content. If the tabular data is > one row, then the MOJO does real-time scoring. If the tabular data is > multiple rows, then the MOJO does batch scoring. I would like to contribute > my processors to MiNiFi C++ as a new feature. > *1 custom python processor* created for MiNiFi: > *H2oMojoPwScoring* - Executes H2O Driverless AI's MOJO Scoring Pipeline in > C++ Runtime Python Wrapper to do batch scoring or real-time scoring on a > frame of data within each incoming flow file. Requires the user to add the > *pipeline.mojo* filepath into the "MOJO Pipeline Filepath" property. This > property is used in the onTrigger(context, session) function to get the > pipeline.mojo filepath, so we can *pass it into* the > *daimojo.model(pipeline_mojo_filepath)* function to instantiate our > *mojo_scorer*. MOJO creation time and uuid are added as individual flow file > attributes. Then the *flow file content* is *loaded into Datatable* *frame* > to hold the test data. Then a Python lambda function called compare is used > to compare whether the datatable frame header column names equals the > expected header column names from the mojo scorer. This check is done because > the datatable frame could have a missing header, which is true when the > header does not equal the expected header and so we update the datatable > frame header with the mojo scorer's expected header. Having the correct > header works nicely because the *mojo scorer's* *predict(datatable_frame)* > function needs the header and then does the prediction returning a > predictions datatable frame. The mojo scorer's predict function is *capable > of doing real-time scoring or batch scoring*, it just depends on the amount > of rows that the tabular data has. This predictions datatable frame is then > converted to pandas dataframe, so we can use pandas' to_string(index=False) > function to convert the dataframe to a string without the dataframe's index. > Then *the prediction string is written to flow file content*. A flow file > attribute is added for the number of rows scored. Another one or more flow > file attributes are added for the predicted label name and its associated > score. Finally, the flow file is transferred on a success relationship. > > *Hydraulic System Condition Monitoring* Data used in MiNiFi Flow: > The sensor test data I used in this integration comes from Kaggle: Condition > Monitoring of Hydraulic Systems. I was able to predict hydraulic system > cooling efficiency through MiNiFi and H2O integration described above. This > use case here is hydraulic system predictive maintenance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (MINIFICPP-1201) Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ Runtime Python Wrapper) To Do ML Inference on Edge
[ https://issues.apache.org/jira/browse/MINIFICPP-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099137#comment-17099137 ] Joe Witt commented on MINIFICPP-1201: - doesnt have to be reverted to be right. Lets just make sure we qiuckly get master to the point we're confident it is clean from an L point of view. If it will happen right away then a new commit is good. If it might take some time then I'd recommend revert. I still dont really understand precisely what the exposure is here from an L point of view in terms of source/binary dependency points of view/. But it seems sketchy and it just needs to become not sketchy :) > Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ > Runtime Python Wrapper) To Do ML Inference on Edge > -- > > Key: MINIFICPP-1201 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1201 > Project: Apache NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: master > Environment: Ubuntu 18.04 in AWS EC2 > MiNiFi C++ 0.7.0 >Reporter: James Medel >Priority: Blocker > Fix For: 0.8.0 > > > *MiNiFi C++ and H2O Driverless AI Integration* via Custom Python Processors: > Integrates MiNiFi C++ with H2O Driverless AI by using Driverless AI's MOJO > Scoring Pipeline (in C++ Runtime Python Wrapper) and MiNiFi's Custom Python > Processor. Uses a Python Processor to execute the MOJO Scoring Pipeline to do > batch scoring or real-time scoring for one or more predicted labels on > tabular test data in the incoming flow file content. If the tabular data is > one row, then the MOJO does real-time scoring. If the tabular data is > multiple rows, then the MOJO does batch scoring. I would like to contribute > my processors to MiNiFi C++ as a new feature. > *1 custom python processor* created for MiNiFi: > *H2oMojoPwScoring* - Executes H2O Driverless AI's MOJO Scoring Pipeline in > C++ Runtime Python Wrapper to do batch scoring or real-time scoring on a > frame of data within each incoming flow file. Requires the user to add the > *pipeline.mojo* filepath into the "MOJO Pipeline Filepath" property. This > property is used in the onTrigger(context, session) function to get the > pipeline.mojo filepath, so we can *pass it into* the > *daimojo.model(pipeline_mojo_filepath)* function to instantiate our > *mojo_scorer*. MOJO creation time and uuid are added as individual flow file > attributes. Then the *flow file content* is *loaded into Datatable* *frame* > to hold the test data. Then a Python lambda function called compare is used > to compare whether the datatable frame header column names equals the > expected header column names from the mojo scorer. This check is done because > the datatable frame could have a missing header, which is true when the > header does not equal the expected header and so we update the datatable > frame header with the mojo scorer's expected header. Having the correct > header works nicely because the *mojo scorer's* *predict(datatable_frame)* > function needs the header and then does the prediction returning a > predictions datatable frame. The mojo scorer's predict function is *capable > of doing real-time scoring or batch scoring*, it just depends on the amount > of rows that the tabular data has. This predictions datatable frame is then > converted to pandas dataframe, so we can use pandas' to_string(index=False) > function to convert the dataframe to a string without the dataframe's index. > Then *the prediction string is written to flow file content*. A flow file > attribute is added for the number of rows scored. Another one or more flow > file attributes are added for the predicted label name and its associated > score. Finally, the flow file is transferred on a success relationship. > > *Hydraulic System Condition Monitoring* Data used in MiNiFi Flow: > The sensor test data I used in this integration comes from Kaggle: Condition > Monitoring of Hydraulic Systems. I was able to predict hydraulic system > cooling efficiency through MiNiFi and H2O integration described above. This > use case here is hydraulic system predictive maintenance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (MINIFICPP-1199) Integrates MiNiFi C++ with H2O Driverless AI Python Scoring Pipeline To Do ML Inference on Edge
[ https://issues.apache.org/jira/browse/MINIFICPP-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099135#comment-17099135 ] Marc Parisi edited comment on MINIFICPP-1199 at 5/4/20, 5:03 PM: - [~joewitt] gotcha I think we're on the same page and I was likely confusing myself via the Apache E-mail chain and being too terse probably confusing you. Thanks! was (Author: phrocker): [~joewitt] gotcha I think we're on the same page and I was likely confusing myself via the Apache E-mail chain. Thanks! > Integrates MiNiFi C++ with H2O Driverless AI Python Scoring Pipeline To Do ML > Inference on Edge > --- > > Key: MINIFICPP-1199 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1199 > Project: Apache NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: master > Environment: Ubuntu 18.04 in AWS EC2 > MiNiFi C++ 0.7.0 >Reporter: James Medel >Priority: Blocker > Fix For: 0.8.0 > > > *MiNiFi C++ and H2O Driverless AI Integration* via Custom Python Processors: > Integrates MiNiFi C++ with H2O's Driverless AI by Using Driverless AI's > Python Scoring Pipeline and MiNiFi's Custom Python Processors. Uses the > Python Processors to execute the Python Scoring Pipeline scorer to do batch > scoring and real-time scoring for one or more predicted labels on test data > in the incoming flow file content. I would like to contribute my processors > to MiNiFi C++ as a new feature. > > *3 custom python processors* created for MiNiFi: > *H2oPspScoreRealTime* - Executes H2O Driverless AI's Python Scoring Pipeline > to do interactive scoring (real-time) scoring on an individual row or list of > test data within each incoming flow file. Uses H2O's open-source Datatable > library to load test data into a frame, then converts it to pandas dataframe. > Pandas is used to convert the pandas dataframe rows to a list of lists, but > since each flow file passing through this processor should have only 1 row, > we extract the 1st list. Then that list is passed into the Driverless AI's > Python scorer.score() function to predict one or more predicted labels. The > prediction is returned to a list. The number of predicted labels is specified > when the user built the Python Scoring Pipeline in Driverless AI. With that > knowledge, there is a property for the user to pass in one or more predicted > label names that will be used as the predicted header. I create a comma > separated string using the predicted header and predicted value. The > predicted header(s) is on one line followed by a newline and the predicted > value(s) is on the next line followed by a newline. The string is written to > the flow file content. Flow File attributes are added to the flow file for > the number of lists scored and the predicted label name and its associated > score. Finally, the flow file is transferred on a success relationship. > > *H2oPspScoreBatches* - Executes H2O Driverless AI's Python Scoring Pipeline > to do batch scoring on a frame of data within each incoming flow file. Uses > H2O's open-source Datatable library to load test data into a frame. Each > frame from the flow file passing through this processor should have multiple > rows. That frame is passed into the Driverless AI's Python > scorer.score_batch() function to predict one or more predicted labels. The > prediction is returned to a pandas dataframe, then that dataframe is > converted to a string, so it can be written to the flow file content. Flow > File attributes are added to the flow file for the number of rows scored. > There are also flow file attributes added for the predicted label name and > its associated score for the first row in the frame. Finally, the flow file > is transferred on a success relationship. > > *ConvertDsToCsv* - Converts data source of incoming flow file to csv. Uses > H2O's open-source Datatable library to load data into a frame, then converts > it to pandas dataframe. Pandas is used to convert the pandas dataframe to a > csv and store it into in-memory text stream StringIO without pandas dataframe > index. The csv string data is grabbed using file read() function on the > StringIO object, so it can be written to the flow file content. The flow file > is transferred on a success relationship. > > *Hydraulic System Condition Monitoring* Data used in MiNiFi Flow: > The sensor test data I used in this integration comes from [Kaggle: Condition > Monitoring of Hydraulic > Systems|[https://www.kaggle.com/jjacostupa/condition-monitoring-of-hydraulic-systems#description.txt]]. > I was able to predict hydraulic system cooling efficiency through MiNiFi and > H2O integration described above. This use case here is
[jira] [Commented] (MINIFICPP-1199) Integrates MiNiFi C++ with H2O Driverless AI Python Scoring Pipeline To Do ML Inference on Edge
[ https://issues.apache.org/jira/browse/MINIFICPP-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099135#comment-17099135 ] Marc Parisi commented on MINIFICPP-1199: [~joewitt] gotcha I think we're on the same page and I was likely confusing myself via the Apache E-mail chain. Thanks! > Integrates MiNiFi C++ with H2O Driverless AI Python Scoring Pipeline To Do ML > Inference on Edge > --- > > Key: MINIFICPP-1199 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1199 > Project: Apache NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: master > Environment: Ubuntu 18.04 in AWS EC2 > MiNiFi C++ 0.7.0 >Reporter: James Medel >Priority: Blocker > Fix For: 0.8.0 > > > *MiNiFi C++ and H2O Driverless AI Integration* via Custom Python Processors: > Integrates MiNiFi C++ with H2O's Driverless AI by Using Driverless AI's > Python Scoring Pipeline and MiNiFi's Custom Python Processors. Uses the > Python Processors to execute the Python Scoring Pipeline scorer to do batch > scoring and real-time scoring for one or more predicted labels on test data > in the incoming flow file content. I would like to contribute my processors > to MiNiFi C++ as a new feature. > > *3 custom python processors* created for MiNiFi: > *H2oPspScoreRealTime* - Executes H2O Driverless AI's Python Scoring Pipeline > to do interactive scoring (real-time) scoring on an individual row or list of > test data within each incoming flow file. Uses H2O's open-source Datatable > library to load test data into a frame, then converts it to pandas dataframe. > Pandas is used to convert the pandas dataframe rows to a list of lists, but > since each flow file passing through this processor should have only 1 row, > we extract the 1st list. Then that list is passed into the Driverless AI's > Python scorer.score() function to predict one or more predicted labels. The > prediction is returned to a list. The number of predicted labels is specified > when the user built the Python Scoring Pipeline in Driverless AI. With that > knowledge, there is a property for the user to pass in one or more predicted > label names that will be used as the predicted header. I create a comma > separated string using the predicted header and predicted value. The > predicted header(s) is on one line followed by a newline and the predicted > value(s) is on the next line followed by a newline. The string is written to > the flow file content. Flow File attributes are added to the flow file for > the number of lists scored and the predicted label name and its associated > score. Finally, the flow file is transferred on a success relationship. > > *H2oPspScoreBatches* - Executes H2O Driverless AI's Python Scoring Pipeline > to do batch scoring on a frame of data within each incoming flow file. Uses > H2O's open-source Datatable library to load test data into a frame. Each > frame from the flow file passing through this processor should have multiple > rows. That frame is passed into the Driverless AI's Python > scorer.score_batch() function to predict one or more predicted labels. The > prediction is returned to a pandas dataframe, then that dataframe is > converted to a string, so it can be written to the flow file content. Flow > File attributes are added to the flow file for the number of rows scored. > There are also flow file attributes added for the predicted label name and > its associated score for the first row in the frame. Finally, the flow file > is transferred on a success relationship. > > *ConvertDsToCsv* - Converts data source of incoming flow file to csv. Uses > H2O's open-source Datatable library to load data into a frame, then converts > it to pandas dataframe. Pandas is used to convert the pandas dataframe to a > csv and store it into in-memory text stream StringIO without pandas dataframe > index. The csv string data is grabbed using file read() function on the > StringIO object, so it can be written to the flow file content. The flow file > is transferred on a success relationship. > > *Hydraulic System Condition Monitoring* Data used in MiNiFi Flow: > The sensor test data I used in this integration comes from [Kaggle: Condition > Monitoring of Hydraulic > Systems|[https://www.kaggle.com/jjacostupa/condition-monitoring-of-hydraulic-systems#description.txt]]. > I was able to predict hydraulic system cooling efficiency through MiNiFi and > H2O integration described above. This use case here is hydraulic system > predictive maintenance. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (MINIFICPP-1199) Integrates MiNiFi C++ with H2O Driverless AI Python Scoring Pipeline To Do ML Inference on Edge
[ https://issues.apache.org/jira/browse/MINIFICPP-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099132#comment-17099132 ] Joe Witt commented on MINIFICPP-1199: - [~phrocker]yeah if that makes what we're doing clean for L It isn't critical to me that we revert - just that we end up at a good state. > Integrates MiNiFi C++ with H2O Driverless AI Python Scoring Pipeline To Do ML > Inference on Edge > --- > > Key: MINIFICPP-1199 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1199 > Project: Apache NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: master > Environment: Ubuntu 18.04 in AWS EC2 > MiNiFi C++ 0.7.0 >Reporter: James Medel >Priority: Blocker > Fix For: 0.8.0 > > > *MiNiFi C++ and H2O Driverless AI Integration* via Custom Python Processors: > Integrates MiNiFi C++ with H2O's Driverless AI by Using Driverless AI's > Python Scoring Pipeline and MiNiFi's Custom Python Processors. Uses the > Python Processors to execute the Python Scoring Pipeline scorer to do batch > scoring and real-time scoring for one or more predicted labels on test data > in the incoming flow file content. I would like to contribute my processors > to MiNiFi C++ as a new feature. > > *3 custom python processors* created for MiNiFi: > *H2oPspScoreRealTime* - Executes H2O Driverless AI's Python Scoring Pipeline > to do interactive scoring (real-time) scoring on an individual row or list of > test data within each incoming flow file. Uses H2O's open-source Datatable > library to load test data into a frame, then converts it to pandas dataframe. > Pandas is used to convert the pandas dataframe rows to a list of lists, but > since each flow file passing through this processor should have only 1 row, > we extract the 1st list. Then that list is passed into the Driverless AI's > Python scorer.score() function to predict one or more predicted labels. The > prediction is returned to a list. The number of predicted labels is specified > when the user built the Python Scoring Pipeline in Driverless AI. With that > knowledge, there is a property for the user to pass in one or more predicted > label names that will be used as the predicted header. I create a comma > separated string using the predicted header and predicted value. The > predicted header(s) is on one line followed by a newline and the predicted > value(s) is on the next line followed by a newline. The string is written to > the flow file content. Flow File attributes are added to the flow file for > the number of lists scored and the predicted label name and its associated > score. Finally, the flow file is transferred on a success relationship. > > *H2oPspScoreBatches* - Executes H2O Driverless AI's Python Scoring Pipeline > to do batch scoring on a frame of data within each incoming flow file. Uses > H2O's open-source Datatable library to load test data into a frame. Each > frame from the flow file passing through this processor should have multiple > rows. That frame is passed into the Driverless AI's Python > scorer.score_batch() function to predict one or more predicted labels. The > prediction is returned to a pandas dataframe, then that dataframe is > converted to a string, so it can be written to the flow file content. Flow > File attributes are added to the flow file for the number of rows scored. > There are also flow file attributes added for the predicted label name and > its associated score for the first row in the frame. Finally, the flow file > is transferred on a success relationship. > > *ConvertDsToCsv* - Converts data source of incoming flow file to csv. Uses > H2O's open-source Datatable library to load data into a frame, then converts > it to pandas dataframe. Pandas is used to convert the pandas dataframe to a > csv and store it into in-memory text stream StringIO without pandas dataframe > index. The csv string data is grabbed using file read() function on the > StringIO object, so it can be written to the flow file content. The flow file > is transferred on a success relationship. > > *Hydraulic System Condition Monitoring* Data used in MiNiFi Flow: > The sensor test data I used in this integration comes from [Kaggle: Condition > Monitoring of Hydraulic > Systems|[https://www.kaggle.com/jjacostupa/condition-monitoring-of-hydraulic-systems#description.txt]]. > I was able to predict hydraulic system cooling efficiency through MiNiFi and > H2O integration described above. This use case here is hydraulic system > predictive maintenance. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (MINIFICPP-1201) Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ Runtime Python Wrapper) To Do ML Inference on Edge
[ https://issues.apache.org/jira/browse/MINIFICPP-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099128#comment-17099128 ] Marc Parisi commented on MINIFICPP-1201: [~joewitt] per the E-mail discussion my opinion is that we can keep this with modifications with H2O-3 . that is, ultimately, [~james.medel] 's decision, so I'm happy to keep this open and revert, but per the other ticket if that code is compliant then could we create a follow on to resolve that? Or would your preference be to remove this then submit that anew? > Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ > Runtime Python Wrapper) To Do ML Inference on Edge > -- > > Key: MINIFICPP-1201 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1201 > Project: Apache NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: master > Environment: Ubuntu 18.04 in AWS EC2 > MiNiFi C++ 0.7.0 >Reporter: James Medel >Priority: Blocker > Fix For: 0.8.0 > > > *MiNiFi C++ and H2O Driverless AI Integration* via Custom Python Processors: > Integrates MiNiFi C++ with H2O Driverless AI by using Driverless AI's MOJO > Scoring Pipeline (in C++ Runtime Python Wrapper) and MiNiFi's Custom Python > Processor. Uses a Python Processor to execute the MOJO Scoring Pipeline to do > batch scoring or real-time scoring for one or more predicted labels on > tabular test data in the incoming flow file content. If the tabular data is > one row, then the MOJO does real-time scoring. If the tabular data is > multiple rows, then the MOJO does batch scoring. I would like to contribute > my processors to MiNiFi C++ as a new feature. > *1 custom python processor* created for MiNiFi: > *H2oMojoPwScoring* - Executes H2O Driverless AI's MOJO Scoring Pipeline in > C++ Runtime Python Wrapper to do batch scoring or real-time scoring on a > frame of data within each incoming flow file. Requires the user to add the > *pipeline.mojo* filepath into the "MOJO Pipeline Filepath" property. This > property is used in the onTrigger(context, session) function to get the > pipeline.mojo filepath, so we can *pass it into* the > *daimojo.model(pipeline_mojo_filepath)* function to instantiate our > *mojo_scorer*. MOJO creation time and uuid are added as individual flow file > attributes. Then the *flow file content* is *loaded into Datatable* *frame* > to hold the test data. Then a Python lambda function called compare is used > to compare whether the datatable frame header column names equals the > expected header column names from the mojo scorer. This check is done because > the datatable frame could have a missing header, which is true when the > header does not equal the expected header and so we update the datatable > frame header with the mojo scorer's expected header. Having the correct > header works nicely because the *mojo scorer's* *predict(datatable_frame)* > function needs the header and then does the prediction returning a > predictions datatable frame. The mojo scorer's predict function is *capable > of doing real-time scoring or batch scoring*, it just depends on the amount > of rows that the tabular data has. This predictions datatable frame is then > converted to pandas dataframe, so we can use pandas' to_string(index=False) > function to convert the dataframe to a string without the dataframe's index. > Then *the prediction string is written to flow file content*. A flow file > attribute is added for the number of rows scored. Another one or more flow > file attributes are added for the predicted label name and its associated > score. Finally, the flow file is transferred on a success relationship. > > *Hydraulic System Condition Monitoring* Data used in MiNiFi Flow: > The sensor test data I used in this integration comes from Kaggle: Condition > Monitoring of Hydraulic Systems. I was able to predict hydraulic system > cooling efficiency through MiNiFi and H2O integration described above. This > use case here is hydraulic system predictive maintenance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] joewitt commented on pull request #4242: NIFI-7411: Integrate H2O Driverless AI MSP in NiFi
joewitt commented on pull request #4242: URL: https://github.com/apache/nifi/pull/4242#issuecomment-623581676 The library h2o-3 using ALv2 is a good start. But if we do end up including that with NiFi we need to concern ourselves with the License and Notice considerations of how it is included in *any* form. For instance we need to understand the difference between a source dependency and binary dependencies. Our source is the 'thing we vote on and officially release'. It is java class files and text files and poms/etc... It is not jars and so on. Jars are part of the binary dependencies and this we do also share in the form of a convenience binary. This is the tar.gz which people can download and untar/copmress and run nifi with. We must adhere to ASF requirements for the types of open source licenses which are allowed. So again in h2o-3 case we aren't apparently planning to pull in source so no problem. But lets say we're going to pull in h2o-3.jar. That must be ALv2 or an otherwise Category A or Category-B item. However, we need to ensure grabbing h2o-3.jar doesn't also cause us to pull 'problematic-licensed-dependency.jar' too. We have to check *every* single jar/etc.. it is a lot of work to get right and important that we do. So just keep that in mind for any subsequent PRs. Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (MINIFICPP-1199) Integrates MiNiFi C++ with H2O Driverless AI Python Scoring Pipeline To Do ML Inference on Edge
[ https://issues.apache.org/jira/browse/MINIFICPP-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099121#comment-17099121 ] Marc Parisi commented on MINIFICPP-1199: I'm back in! [~james.medel] per [~joewitt] 's email chain will the H2O-3 library resolve his concerns? [~joewitt] would you be okay with a separate ticket to convert those lines you mentioned via E-mail to use the ASLv2 licensed library? > Integrates MiNiFi C++ with H2O Driverless AI Python Scoring Pipeline To Do ML > Inference on Edge > --- > > Key: MINIFICPP-1199 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1199 > Project: Apache NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: master > Environment: Ubuntu 18.04 in AWS EC2 > MiNiFi C++ 0.7.0 >Reporter: James Medel >Priority: Blocker > Fix For: 0.8.0 > > > *MiNiFi C++ and H2O Driverless AI Integration* via Custom Python Processors: > Integrates MiNiFi C++ with H2O's Driverless AI by Using Driverless AI's > Python Scoring Pipeline and MiNiFi's Custom Python Processors. Uses the > Python Processors to execute the Python Scoring Pipeline scorer to do batch > scoring and real-time scoring for one or more predicted labels on test data > in the incoming flow file content. I would like to contribute my processors > to MiNiFi C++ as a new feature. > > *3 custom python processors* created for MiNiFi: > *H2oPspScoreRealTime* - Executes H2O Driverless AI's Python Scoring Pipeline > to do interactive scoring (real-time) scoring on an individual row or list of > test data within each incoming flow file. Uses H2O's open-source Datatable > library to load test data into a frame, then converts it to pandas dataframe. > Pandas is used to convert the pandas dataframe rows to a list of lists, but > since each flow file passing through this processor should have only 1 row, > we extract the 1st list. Then that list is passed into the Driverless AI's > Python scorer.score() function to predict one or more predicted labels. The > prediction is returned to a list. The number of predicted labels is specified > when the user built the Python Scoring Pipeline in Driverless AI. With that > knowledge, there is a property for the user to pass in one or more predicted > label names that will be used as the predicted header. I create a comma > separated string using the predicted header and predicted value. The > predicted header(s) is on one line followed by a newline and the predicted > value(s) is on the next line followed by a newline. The string is written to > the flow file content. Flow File attributes are added to the flow file for > the number of lists scored and the predicted label name and its associated > score. Finally, the flow file is transferred on a success relationship. > > *H2oPspScoreBatches* - Executes H2O Driverless AI's Python Scoring Pipeline > to do batch scoring on a frame of data within each incoming flow file. Uses > H2O's open-source Datatable library to load test data into a frame. Each > frame from the flow file passing through this processor should have multiple > rows. That frame is passed into the Driverless AI's Python > scorer.score_batch() function to predict one or more predicted labels. The > prediction is returned to a pandas dataframe, then that dataframe is > converted to a string, so it can be written to the flow file content. Flow > File attributes are added to the flow file for the number of rows scored. > There are also flow file attributes added for the predicted label name and > its associated score for the first row in the frame. Finally, the flow file > is transferred on a success relationship. > > *ConvertDsToCsv* - Converts data source of incoming flow file to csv. Uses > H2O's open-source Datatable library to load data into a frame, then converts > it to pandas dataframe. Pandas is used to convert the pandas dataframe to a > csv and store it into in-memory text stream StringIO without pandas dataframe > index. The csv string data is grabbed using file read() function on the > StringIO object, so it can be written to the flow file content. The flow file > is transferred on a success relationship. > > *Hydraulic System Condition Monitoring* Data used in MiNiFi Flow: > The sensor test data I used in this integration comes from [Kaggle: Condition > Monitoring of Hydraulic > Systems|[https://www.kaggle.com/jjacostupa/condition-monitoring-of-hydraulic-systems#description.txt]]. > I was able to predict hydraulic system cooling efficiency through MiNiFi and > H2O integration described above. This use case here is hydraulic system > predictive maintenance. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] ottobackwards commented on pull request #4244: NIFI-7414: Escape user-defined values that contain invalid XML charac…
ottobackwards commented on pull request #4244: URL: https://github.com/apache/nifi/pull/4244#issuecomment-623572437 Great, the tests look like they are failing though This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] james94 commented on pull request #4242: NIFI-7411: Integrate H2O Driverless AI MSP in NiFi
james94 commented on pull request #4242: URL: https://github.com/apache/nifi/pull/4242#issuecomment-623566815 @joewitt Thank you for sharing your perspective. Also thank you for your advice on the processor name and code changes, I have incorporated them into the code. **h2o-3** uses the **Apache 2.0 license**: https://github.com/h2oai/h2o-3/blob/master/LICENSE. I see **NiFi** and **MiNiFi C++** use the **Apache 2.0 license**. About your question on this pull request for **NIFI-7411, it can be closed.** I will close it today. Also, I am moving the NiFi processor code over to h2o **dai-deployment-examples** repository: https://github.com/h2oai/dai-deployment-examples/pull/18 I will move the MiNiFi C++ Python processors that use H2O Driverless AI Mojo and Python Scoring Pipeline over to the **dai-deployment-examples** repo too. I will open a separate Jira for MiNiFi C++ about this action. Maintainability by the community: I would like to become a NiFi and MiNiFi C++ contributor. So, my intent would be to help out with the components I create for both these projects. From now on, if I contribute a new component to NiFi and/or MiNiFi C++, I will make sure it uses a compatible license, such as **Apache 2.0 license** and that it abides by the NiFi license guide you shared: http://nifi.apache.org/licensing-guide.html. Since **h2o-3** uses the **Apache 2.0 license**, I would like to contribute a new NiFi and MiNiFiCPP processor that uses **h2o-3** in a future pull request. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (NIFI-7422) Support aws_s3_pseudo_dir in Atlas reporting task
[ https://issues.apache.org/jira/browse/NIFI-7422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Turcsanyi updated NIFI-7422: -- Component/s: Extensions > Support aws_s3_pseudo_dir in Atlas reporting task > - > > Key: NIFI-7422 > URL: https://issues.apache.org/jira/browse/NIFI-7422 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Peter Turcsanyi >Assignee: Peter Turcsanyi >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (NIFI-7422) Support aws_s3_pseudo_dir in Atlas reporting task
Peter Turcsanyi created NIFI-7422: - Summary: Support aws_s3_pseudo_dir in Atlas reporting task Key: NIFI-7422 URL: https://issues.apache.org/jira/browse/NIFI-7422 Project: Apache NiFi Issue Type: Improvement Reporter: Peter Turcsanyi Assignee: Peter Turcsanyi -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi-minifi-cpp] phrocker commented on pull request #766: MINIFI-1201: Integrate H2O Driverless AI MSP in MiNFi
phrocker commented on pull request #766: URL: https://github.com/apache/nifi-minifi-cpp/pull/766#issuecomment-623565229 @james94 If so can you take a look at the apache thread and chime in RE H2O-3 to ensure things here should be kept. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] phrocker commented on pull request #766: MINIFI-1201: Integrate H2O Driverless AI MSP in MiNFi
phrocker commented on pull request #766: URL: https://github.com/apache/nifi-minifi-cpp/pull/766#issuecomment-623563806 @james94 I wanted to confirm something. You mentioned H2O-3 in regards to the python processors for MiNiFi C++ (https://github.com/apache/nifi-minifi-cpp/blob/master/extensions/pythonprocessors/h2o/dai/msp/H2oMojoPwScoring.py)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (MINIFICPP-1151) Eliminate easy-to-fix compiler warnings
[ https://issues.apache.org/jira/browse/MINIFICPP-1151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arpad Boda updated MINIFICPP-1151: -- Fix Version/s: 0.8.0 > Eliminate easy-to-fix compiler warnings > --- > > Key: MINIFICPP-1151 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1151 > Project: Apache NiFi MiNiFi C++ > Issue Type: Improvement >Reporter: Marton Szasz >Assignee: Marton Szasz >Priority: Minor > Fix For: 0.8.0 > > Time Spent: 2.5h > Remaining Estimate: 0h > > Most of our warnings are easy to fix. The goal of this issue is to gain the > most value for a reasonably small effort investment. > Optionally increase the warning level of compilation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (MINIFICPP-1194) MiNiFi-DOCS - Fix Table Formatting for 2 Execute.* Processors
[ https://issues.apache.org/jira/browse/MINIFICPP-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aldrin Piri updated MINIFICPP-1194: --- Fix Version/s: 0.8.0 > MiNiFi-DOCS - Fix Table Formatting for 2 Execute.* Processors > - > > Key: MINIFICPP-1194 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1194 > Project: Apache NiFi MiNiFi C++ > Issue Type: Documentation >Affects Versions: master > Environment: Ubuntu 18.04 LTS in AWS EC2 >Reporter: James Medel >Priority: Major > Fix For: 0.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Table formatting issues were making it difficult to read properties for > ExecutePythonProcessor and ExecuteScript. So, this change in markdown > resolves the formatting issues. > Doc affected: > [https://github.com/apache/nifi-minifi-cpp/blob/master/PROCESSORS.md#executescript] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (MINIFICPP-1195) MiNiFiCPP-DOCS: Updates README Outdated Bootstrapping Info
[ https://issues.apache.org/jira/browse/MINIFICPP-1195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aldrin Piri updated MINIFICPP-1195: --- Fix Version/s: (was: master) 0.8.0 > MiNiFiCPP-DOCS: Updates README Outdated Bootstrapping Info > -- > > Key: MINIFICPP-1195 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1195 > Project: Apache NiFi MiNiFi C++ > Issue Type: Documentation >Affects Versions: master > Environment: Ubuntu 18.04 LTS on AWS EC2 >Reporter: James Medel >Priority: Major > Fix For: 0.8.0 > > > The paragraph in the Getting Started: Bootstrapping section originally told > the user to enter N to continue the build process, but with the updated menu > guided bootstrap process displayed in the console, P is what one must enter > to continue the build process. I updated that info. > The previous menu guided bootstrap process was outdated and did not show the > new features Bustache Support to SQL Support and the Build Options. So, I > updated it to show that information. > > Here is my pull request with the updates: > [https://github.com/apache/nifi-minifi-cpp/pull/760] > Here is the doc that I am proposing updates for: > [https://github.com/apache/nifi-minifi-cpp/blob/master/README.md] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] markap14 commented on pull request #4244: NIFI-7414: Escape user-defined values that contain invalid XML charac…
markap14 commented on pull request #4244: URL: https://github.com/apache/nifi/pull/4244#issuecomment-623538477 Sure, updated unit tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (MINIFICPP-1201) Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ Runtime Python Wrapper) To Do ML Inference on Edge
[ https://issues.apache.org/jira/browse/MINIFICPP-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arpad Boda updated MINIFICPP-1201: -- Fix Version/s: (was: master) 0.8.0 > Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ > Runtime Python Wrapper) To Do ML Inference on Edge > -- > > Key: MINIFICPP-1201 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1201 > Project: Apache NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: master > Environment: Ubuntu 18.04 in AWS EC2 > MiNiFi C++ 0.7.0 >Reporter: James Medel >Priority: Blocker > Fix For: 0.8.0 > > > *MiNiFi C++ and H2O Driverless AI Integration* via Custom Python Processors: > Integrates MiNiFi C++ with H2O Driverless AI by using Driverless AI's MOJO > Scoring Pipeline (in C++ Runtime Python Wrapper) and MiNiFi's Custom Python > Processor. Uses a Python Processor to execute the MOJO Scoring Pipeline to do > batch scoring or real-time scoring for one or more predicted labels on > tabular test data in the incoming flow file content. If the tabular data is > one row, then the MOJO does real-time scoring. If the tabular data is > multiple rows, then the MOJO does batch scoring. I would like to contribute > my processors to MiNiFi C++ as a new feature. > *1 custom python processor* created for MiNiFi: > *H2oMojoPwScoring* - Executes H2O Driverless AI's MOJO Scoring Pipeline in > C++ Runtime Python Wrapper to do batch scoring or real-time scoring on a > frame of data within each incoming flow file. Requires the user to add the > *pipeline.mojo* filepath into the "MOJO Pipeline Filepath" property. This > property is used in the onTrigger(context, session) function to get the > pipeline.mojo filepath, so we can *pass it into* the > *daimojo.model(pipeline_mojo_filepath)* function to instantiate our > *mojo_scorer*. MOJO creation time and uuid are added as individual flow file > attributes. Then the *flow file content* is *loaded into Datatable* *frame* > to hold the test data. Then a Python lambda function called compare is used > to compare whether the datatable frame header column names equals the > expected header column names from the mojo scorer. This check is done because > the datatable frame could have a missing header, which is true when the > header does not equal the expected header and so we update the datatable > frame header with the mojo scorer's expected header. Having the correct > header works nicely because the *mojo scorer's* *predict(datatable_frame)* > function needs the header and then does the prediction returning a > predictions datatable frame. The mojo scorer's predict function is *capable > of doing real-time scoring or batch scoring*, it just depends on the amount > of rows that the tabular data has. This predictions datatable frame is then > converted to pandas dataframe, so we can use pandas' to_string(index=False) > function to convert the dataframe to a string without the dataframe's index. > Then *the prediction string is written to flow file content*. A flow file > attribute is added for the number of rows scored. Another one or more flow > file attributes are added for the predicted label name and its associated > score. Finally, the flow file is transferred on a success relationship. > > *Hydraulic System Condition Monitoring* Data used in MiNiFi Flow: > The sensor test data I used in this integration comes from Kaggle: Condition > Monitoring of Hydraulic Systems. I was able to predict hydraulic system > cooling efficiency through MiNiFi and H2O integration described above. This > use case here is hydraulic system predictive maintenance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (MINIFICPP-1199) Integrates MiNiFi C++ with H2O Driverless AI Python Scoring Pipeline To Do ML Inference on Edge
[ https://issues.apache.org/jira/browse/MINIFICPP-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arpad Boda updated MINIFICPP-1199: -- Fix Version/s: (was: master) 0.8.0 > Integrates MiNiFi C++ with H2O Driverless AI Python Scoring Pipeline To Do ML > Inference on Edge > --- > > Key: MINIFICPP-1199 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1199 > Project: Apache NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: master > Environment: Ubuntu 18.04 in AWS EC2 > MiNiFi C++ 0.7.0 >Reporter: James Medel >Priority: Blocker > Fix For: 0.8.0 > > > *MiNiFi C++ and H2O Driverless AI Integration* via Custom Python Processors: > Integrates MiNiFi C++ with H2O's Driverless AI by Using Driverless AI's > Python Scoring Pipeline and MiNiFi's Custom Python Processors. Uses the > Python Processors to execute the Python Scoring Pipeline scorer to do batch > scoring and real-time scoring for one or more predicted labels on test data > in the incoming flow file content. I would like to contribute my processors > to MiNiFi C++ as a new feature. > > *3 custom python processors* created for MiNiFi: > *H2oPspScoreRealTime* - Executes H2O Driverless AI's Python Scoring Pipeline > to do interactive scoring (real-time) scoring on an individual row or list of > test data within each incoming flow file. Uses H2O's open-source Datatable > library to load test data into a frame, then converts it to pandas dataframe. > Pandas is used to convert the pandas dataframe rows to a list of lists, but > since each flow file passing through this processor should have only 1 row, > we extract the 1st list. Then that list is passed into the Driverless AI's > Python scorer.score() function to predict one or more predicted labels. The > prediction is returned to a list. The number of predicted labels is specified > when the user built the Python Scoring Pipeline in Driverless AI. With that > knowledge, there is a property for the user to pass in one or more predicted > label names that will be used as the predicted header. I create a comma > separated string using the predicted header and predicted value. The > predicted header(s) is on one line followed by a newline and the predicted > value(s) is on the next line followed by a newline. The string is written to > the flow file content. Flow File attributes are added to the flow file for > the number of lists scored and the predicted label name and its associated > score. Finally, the flow file is transferred on a success relationship. > > *H2oPspScoreBatches* - Executes H2O Driverless AI's Python Scoring Pipeline > to do batch scoring on a frame of data within each incoming flow file. Uses > H2O's open-source Datatable library to load test data into a frame. Each > frame from the flow file passing through this processor should have multiple > rows. That frame is passed into the Driverless AI's Python > scorer.score_batch() function to predict one or more predicted labels. The > prediction is returned to a pandas dataframe, then that dataframe is > converted to a string, so it can be written to the flow file content. Flow > File attributes are added to the flow file for the number of rows scored. > There are also flow file attributes added for the predicted label name and > its associated score for the first row in the frame. Finally, the flow file > is transferred on a success relationship. > > *ConvertDsToCsv* - Converts data source of incoming flow file to csv. Uses > H2O's open-source Datatable library to load data into a frame, then converts > it to pandas dataframe. Pandas is used to convert the pandas dataframe to a > csv and store it into in-memory text stream StringIO without pandas dataframe > index. The csv string data is grabbed using file read() function on the > StringIO object, so it can be written to the flow file content. The flow file > is transferred on a success relationship. > > *Hydraulic System Condition Monitoring* Data used in MiNiFi Flow: > The sensor test data I used in this integration comes from [Kaggle: Condition > Monitoring of Hydraulic > Systems|[https://www.kaggle.com/jjacostupa/condition-monitoring-of-hydraulic-systems#description.txt]]. > I was able to predict hydraulic system cooling efficiency through MiNiFi and > H2O integration described above. This use case here is hydraulic system > predictive maintenance. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (MINIFICPP-1151) Eliminate easy-to-fix compiler warnings
[ https://issues.apache.org/jira/browse/MINIFICPP-1151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marton Szasz updated MINIFICPP-1151: Resolution: Fixed Status: Resolved (was: Patch Available) > Eliminate easy-to-fix compiler warnings > --- > > Key: MINIFICPP-1151 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1151 > Project: Apache NiFi MiNiFi C++ > Issue Type: Improvement >Reporter: Marton Szasz >Assignee: Marton Szasz >Priority: Minor > Time Spent: 2.5h > Remaining Estimate: 0h > > Most of our warnings are easy to fix. The goal of this issue is to gain the > most value for a reasonably small effort investment. > Optionally increase the warning level of compilation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (MINIFICPP-1151) Eliminate easy-to-fix compiler warnings
[ https://issues.apache.org/jira/browse/MINIFICPP-1151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marton Szasz closed MINIFICPP-1151. --- > Eliminate easy-to-fix compiler warnings > --- > > Key: MINIFICPP-1151 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1151 > Project: Apache NiFi MiNiFi C++ > Issue Type: Improvement >Reporter: Marton Szasz >Assignee: Marton Szasz >Priority: Minor > Time Spent: 2.5h > Remaining Estimate: 0h > > Most of our warnings are easy to fix. The goal of this issue is to gain the > most value for a reasonably small effort investment. > Optionally increase the warning level of compilation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #773: MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
szaszm commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r419481789 ## File path: libminifi/include/utils/Void_t.h ## @@ -0,0 +1,17 @@ +#pragma once Review comment: I feel it a bit of overkill to have a separate file for such a tiny utility. I would just put `void_t` in GeneralUtils.h next to the other polyfills. ## File path: libminifi/include/utils/Void_t.h ## @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +// ppartial detection idiom impl Review comment: `void_t` is used by, but not really part of the detection idiom. Could you move the comment line back to StringUtils.h? I forgot to add a reference to cppreference.com in the original. When moving it back, could you please append `, from cppreference.com` to the comment and fix the typo (`s/ppartial/partial/`)? So my ask summarized: - remove this comment line - Add the line below to where it was removed from: ``` // partial detection idiom impl, from cppreference.com ``` ## File path: libminifi/include/utils/TryMoveCall.h ## @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include // NOLINT + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +// TryMoveCall calls an +// - unary function of a lvalue reference-type argument by passing a ref +// - unary function of any other argument type by moving into it +template +struct TryMoveCall { Review comment: If we decouple this from the queue, we should consider making it handle return values (other than `void`). ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -21,20 +21,23 @@ #include #include #include +#include #include +#include Review comment: `` is no longer used since `TryMoveCall` and `void_t` were extracted. Please fix headers to follow `include-what-you-use` rationale: https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#sf11-header-files-should-be-self-contained If `utils/TryMoveCall.h` were not self-contained, following iwyu would more likely expose the issue in the form of compilation caure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (MINIFICPP-1199) Integrates MiNiFi C++ with H2O Driverless AI Python Scoring Pipeline To Do ML Inference on Edge
[ https://issues.apache.org/jira/browse/MINIFICPP-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099003#comment-17099003 ] Joe Witt commented on MINIFICPP-1199: - tagging as a blocker since the commit has been merged and yet licensing remains a concern > Integrates MiNiFi C++ with H2O Driverless AI Python Scoring Pipeline To Do ML > Inference on Edge > --- > > Key: MINIFICPP-1199 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1199 > Project: Apache NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: master > Environment: Ubuntu 18.04 in AWS EC2 > MiNiFi C++ 0.7.0 >Reporter: James Medel >Priority: Blocker > Fix For: master > > > *MiNiFi C++ and H2O Driverless AI Integration* via Custom Python Processors: > Integrates MiNiFi C++ with H2O's Driverless AI by Using Driverless AI's > Python Scoring Pipeline and MiNiFi's Custom Python Processors. Uses the > Python Processors to execute the Python Scoring Pipeline scorer to do batch > scoring and real-time scoring for one or more predicted labels on test data > in the incoming flow file content. I would like to contribute my processors > to MiNiFi C++ as a new feature. > > *3 custom python processors* created for MiNiFi: > *H2oPspScoreRealTime* - Executes H2O Driverless AI's Python Scoring Pipeline > to do interactive scoring (real-time) scoring on an individual row or list of > test data within each incoming flow file. Uses H2O's open-source Datatable > library to load test data into a frame, then converts it to pandas dataframe. > Pandas is used to convert the pandas dataframe rows to a list of lists, but > since each flow file passing through this processor should have only 1 row, > we extract the 1st list. Then that list is passed into the Driverless AI's > Python scorer.score() function to predict one or more predicted labels. The > prediction is returned to a list. The number of predicted labels is specified > when the user built the Python Scoring Pipeline in Driverless AI. With that > knowledge, there is a property for the user to pass in one or more predicted > label names that will be used as the predicted header. I create a comma > separated string using the predicted header and predicted value. The > predicted header(s) is on one line followed by a newline and the predicted > value(s) is on the next line followed by a newline. The string is written to > the flow file content. Flow File attributes are added to the flow file for > the number of lists scored and the predicted label name and its associated > score. Finally, the flow file is transferred on a success relationship. > > *H2oPspScoreBatches* - Executes H2O Driverless AI's Python Scoring Pipeline > to do batch scoring on a frame of data within each incoming flow file. Uses > H2O's open-source Datatable library to load test data into a frame. Each > frame from the flow file passing through this processor should have multiple > rows. That frame is passed into the Driverless AI's Python > scorer.score_batch() function to predict one or more predicted labels. The > prediction is returned to a pandas dataframe, then that dataframe is > converted to a string, so it can be written to the flow file content. Flow > File attributes are added to the flow file for the number of rows scored. > There are also flow file attributes added for the predicted label name and > its associated score for the first row in the frame. Finally, the flow file > is transferred on a success relationship. > > *ConvertDsToCsv* - Converts data source of incoming flow file to csv. Uses > H2O's open-source Datatable library to load data into a frame, then converts > it to pandas dataframe. Pandas is used to convert the pandas dataframe to a > csv and store it into in-memory text stream StringIO without pandas dataframe > index. The csv string data is grabbed using file read() function on the > StringIO object, so it can be written to the flow file content. The flow file > is transferred on a success relationship. > > *Hydraulic System Condition Monitoring* Data used in MiNiFi Flow: > The sensor test data I used in this integration comes from [Kaggle: Condition > Monitoring of Hydraulic > Systems|[https://www.kaggle.com/jjacostupa/condition-monitoring-of-hydraulic-systems#description.txt]]. > I was able to predict hydraulic system cooling efficiency through MiNiFi and > H2O integration described above. This use case here is hydraulic system > predictive maintenance. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (MINIFICPP-1201) Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ Runtime Python Wrapper) To Do ML Inference on Edge
[ https://issues.apache.org/jira/browse/MINIFICPP-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Witt updated MINIFICPP-1201: Priority: Blocker (was: Major) > Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ > Runtime Python Wrapper) To Do ML Inference on Edge > -- > > Key: MINIFICPP-1201 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1201 > Project: Apache NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: master > Environment: Ubuntu 18.04 in AWS EC2 > MiNiFi C++ 0.7.0 >Reporter: James Medel >Priority: Blocker > Fix For: master > > > *MiNiFi C++ and H2O Driverless AI Integration* via Custom Python Processors: > Integrates MiNiFi C++ with H2O Driverless AI by using Driverless AI's MOJO > Scoring Pipeline (in C++ Runtime Python Wrapper) and MiNiFi's Custom Python > Processor. Uses a Python Processor to execute the MOJO Scoring Pipeline to do > batch scoring or real-time scoring for one or more predicted labels on > tabular test data in the incoming flow file content. If the tabular data is > one row, then the MOJO does real-time scoring. If the tabular data is > multiple rows, then the MOJO does batch scoring. I would like to contribute > my processors to MiNiFi C++ as a new feature. > *1 custom python processor* created for MiNiFi: > *H2oMojoPwScoring* - Executes H2O Driverless AI's MOJO Scoring Pipeline in > C++ Runtime Python Wrapper to do batch scoring or real-time scoring on a > frame of data within each incoming flow file. Requires the user to add the > *pipeline.mojo* filepath into the "MOJO Pipeline Filepath" property. This > property is used in the onTrigger(context, session) function to get the > pipeline.mojo filepath, so we can *pass it into* the > *daimojo.model(pipeline_mojo_filepath)* function to instantiate our > *mojo_scorer*. MOJO creation time and uuid are added as individual flow file > attributes. Then the *flow file content* is *loaded into Datatable* *frame* > to hold the test data. Then a Python lambda function called compare is used > to compare whether the datatable frame header column names equals the > expected header column names from the mojo scorer. This check is done because > the datatable frame could have a missing header, which is true when the > header does not equal the expected header and so we update the datatable > frame header with the mojo scorer's expected header. Having the correct > header works nicely because the *mojo scorer's* *predict(datatable_frame)* > function needs the header and then does the prediction returning a > predictions datatable frame. The mojo scorer's predict function is *capable > of doing real-time scoring or batch scoring*, it just depends on the amount > of rows that the tabular data has. This predictions datatable frame is then > converted to pandas dataframe, so we can use pandas' to_string(index=False) > function to convert the dataframe to a string without the dataframe's index. > Then *the prediction string is written to flow file content*. A flow file > attribute is added for the number of rows scored. Another one or more flow > file attributes are added for the predicted label name and its associated > score. Finally, the flow file is transferred on a success relationship. > > *Hydraulic System Condition Monitoring* Data used in MiNiFi Flow: > The sensor test data I used in this integration comes from Kaggle: Condition > Monitoring of Hydraulic Systems. I was able to predict hydraulic system > cooling efficiency through MiNiFi and H2O integration described above. This > use case here is hydraulic system predictive maintenance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (MINIFICPP-1201) Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ Runtime Python Wrapper) To Do ML Inference on Edge
[ https://issues.apache.org/jira/browse/MINIFICPP-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099004#comment-17099004 ] Joe Witt commented on MINIFICPP-1201: - tagging as a blocker since the commit has been merged and yet licensing remains a concern > Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ > Runtime Python Wrapper) To Do ML Inference on Edge > -- > > Key: MINIFICPP-1201 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1201 > Project: Apache NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: master > Environment: Ubuntu 18.04 in AWS EC2 > MiNiFi C++ 0.7.0 >Reporter: James Medel >Priority: Blocker > Fix For: master > > > *MiNiFi C++ and H2O Driverless AI Integration* via Custom Python Processors: > Integrates MiNiFi C++ with H2O Driverless AI by using Driverless AI's MOJO > Scoring Pipeline (in C++ Runtime Python Wrapper) and MiNiFi's Custom Python > Processor. Uses a Python Processor to execute the MOJO Scoring Pipeline to do > batch scoring or real-time scoring for one or more predicted labels on > tabular test data in the incoming flow file content. If the tabular data is > one row, then the MOJO does real-time scoring. If the tabular data is > multiple rows, then the MOJO does batch scoring. I would like to contribute > my processors to MiNiFi C++ as a new feature. > *1 custom python processor* created for MiNiFi: > *H2oMojoPwScoring* - Executes H2O Driverless AI's MOJO Scoring Pipeline in > C++ Runtime Python Wrapper to do batch scoring or real-time scoring on a > frame of data within each incoming flow file. Requires the user to add the > *pipeline.mojo* filepath into the "MOJO Pipeline Filepath" property. This > property is used in the onTrigger(context, session) function to get the > pipeline.mojo filepath, so we can *pass it into* the > *daimojo.model(pipeline_mojo_filepath)* function to instantiate our > *mojo_scorer*. MOJO creation time and uuid are added as individual flow file > attributes. Then the *flow file content* is *loaded into Datatable* *frame* > to hold the test data. Then a Python lambda function called compare is used > to compare whether the datatable frame header column names equals the > expected header column names from the mojo scorer. This check is done because > the datatable frame could have a missing header, which is true when the > header does not equal the expected header and so we update the datatable > frame header with the mojo scorer's expected header. Having the correct > header works nicely because the *mojo scorer's* *predict(datatable_frame)* > function needs the header and then does the prediction returning a > predictions datatable frame. The mojo scorer's predict function is *capable > of doing real-time scoring or batch scoring*, it just depends on the amount > of rows that the tabular data has. This predictions datatable frame is then > converted to pandas dataframe, so we can use pandas' to_string(index=False) > function to convert the dataframe to a string without the dataframe's index. > Then *the prediction string is written to flow file content*. A flow file > attribute is added for the number of rows scored. Another one or more flow > file attributes are added for the predicted label name and its associated > score. Finally, the flow file is transferred on a success relationship. > > *Hydraulic System Condition Monitoring* Data used in MiNiFi Flow: > The sensor test data I used in this integration comes from Kaggle: Condition > Monitoring of Hydraulic Systems. I was able to predict hydraulic system > cooling efficiency through MiNiFi and H2O integration described above. This > use case here is hydraulic system predictive maintenance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (MINIFICPP-1199) Integrates MiNiFi C++ with H2O Driverless AI Python Scoring Pipeline To Do ML Inference on Edge
[ https://issues.apache.org/jira/browse/MINIFICPP-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Witt updated MINIFICPP-1199: Priority: Blocker (was: Major) > Integrates MiNiFi C++ with H2O Driverless AI Python Scoring Pipeline To Do ML > Inference on Edge > --- > > Key: MINIFICPP-1199 > URL: https://issues.apache.org/jira/browse/MINIFICPP-1199 > Project: Apache NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: master > Environment: Ubuntu 18.04 in AWS EC2 > MiNiFi C++ 0.7.0 >Reporter: James Medel >Priority: Blocker > Fix For: master > > > *MiNiFi C++ and H2O Driverless AI Integration* via Custom Python Processors: > Integrates MiNiFi C++ with H2O's Driverless AI by Using Driverless AI's > Python Scoring Pipeline and MiNiFi's Custom Python Processors. Uses the > Python Processors to execute the Python Scoring Pipeline scorer to do batch > scoring and real-time scoring for one or more predicted labels on test data > in the incoming flow file content. I would like to contribute my processors > to MiNiFi C++ as a new feature. > > *3 custom python processors* created for MiNiFi: > *H2oPspScoreRealTime* - Executes H2O Driverless AI's Python Scoring Pipeline > to do interactive scoring (real-time) scoring on an individual row or list of > test data within each incoming flow file. Uses H2O's open-source Datatable > library to load test data into a frame, then converts it to pandas dataframe. > Pandas is used to convert the pandas dataframe rows to a list of lists, but > since each flow file passing through this processor should have only 1 row, > we extract the 1st list. Then that list is passed into the Driverless AI's > Python scorer.score() function to predict one or more predicted labels. The > prediction is returned to a list. The number of predicted labels is specified > when the user built the Python Scoring Pipeline in Driverless AI. With that > knowledge, there is a property for the user to pass in one or more predicted > label names that will be used as the predicted header. I create a comma > separated string using the predicted header and predicted value. The > predicted header(s) is on one line followed by a newline and the predicted > value(s) is on the next line followed by a newline. The string is written to > the flow file content. Flow File attributes are added to the flow file for > the number of lists scored and the predicted label name and its associated > score. Finally, the flow file is transferred on a success relationship. > > *H2oPspScoreBatches* - Executes H2O Driverless AI's Python Scoring Pipeline > to do batch scoring on a frame of data within each incoming flow file. Uses > H2O's open-source Datatable library to load test data into a frame. Each > frame from the flow file passing through this processor should have multiple > rows. That frame is passed into the Driverless AI's Python > scorer.score_batch() function to predict one or more predicted labels. The > prediction is returned to a pandas dataframe, then that dataframe is > converted to a string, so it can be written to the flow file content. Flow > File attributes are added to the flow file for the number of rows scored. > There are also flow file attributes added for the predicted label name and > its associated score for the first row in the frame. Finally, the flow file > is transferred on a success relationship. > > *ConvertDsToCsv* - Converts data source of incoming flow file to csv. Uses > H2O's open-source Datatable library to load data into a frame, then converts > it to pandas dataframe. Pandas is used to convert the pandas dataframe to a > csv and store it into in-memory text stream StringIO without pandas dataframe > index. The csv string data is grabbed using file read() function on the > StringIO object, so it can be written to the flow file content. The flow file > is transferred on a success relationship. > > *Hydraulic System Condition Monitoring* Data used in MiNiFi Flow: > The sensor test data I used in this integration comes from [Kaggle: Condition > Monitoring of Hydraulic > Systems|[https://www.kaggle.com/jjacostupa/condition-monitoring-of-hydraulic-systems#description.txt]]. > I was able to predict hydraulic system cooling efficiency through MiNiFi and > H2O integration described above. This use case here is hydraulic system > predictive maintenance. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #775: MINIFICPP-965 Add other supported relationships to InvokeHTTP processor.
msharee9 commented on a change in pull request #775: URL: https://github.com/apache/nifi-minifi-cpp/pull/775#discussion_r419491795 ## File path: extensions/http-curl/client/HTTPClient.h ## @@ -124,11 +135,11 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable { bool setMinimumSSLVersion(SSLVersion minimum_version) override; - void setKeepAliveProbe(long probe){ + void setKeepAliveProbe(std::chrono::milliseconds probe){ keep_alive_probe_ = probe; } - void setKeepAliveIdle(long idle){ Review comment: Agree. especially when they are public members. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #775: MINIFICPP-965 Add other supported relationships to InvokeHTTP processor.
msharee9 commented on a change in pull request #775: URL: https://github.com/apache/nifi-minifi-cpp/pull/775#discussion_r419489924 ## File path: extensions/http-curl/tests/HTTPHandlers.h ## @@ -445,4 +445,49 @@ class HeartbeatHandler : public CivetHandler { bool isSecure; }; +class InvokeHTTPResponseOKHandler : public CivetHandler { +public: + bool handlePost(CivetServer *, struct mg_connection *conn) { +std::stringstream headers; +headers << "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"; Review comment: Absolutely. stringstream is unnecessary here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] szaszm commented on pull request #773: MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
szaszm commented on pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#issuecomment-623497918 Thanks for your effort towards fixing all of the issues we pointed out. The consume family of member functions will provide a significant improvement in the usability of the class. I also like that the unit tests now utilize sections. As I mentioned [here](https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r419476317), feel free to question my code review comments anytime, as oftentimes they are just mere ideas to keep things simple/short, and may be missing some important point. I'll do another review of the latest changes soon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] joewitt commented on pull request #4242: NIFI-7411: Integrate H2O Driverless AI MSP in NiFi
joewitt commented on pull request #4242: URL: https://github.com/apache/nifi/pull/4242#issuecomment-623497483 @james94 We need to be clear/careful here. The word 'open source' is not very helpful in this case. License Compatibility: The ASF publishes clear guidelines for what is acceptable to include. It is detailed relative to NiFi concerns here http://nifi.apache.org/licensing-guide.html. I don't know which open source license you're referring to for 'h2o-3' - please clarify. Further, it is not clear to me whether you're saying "we could close this PR and make a new one that uses that" or whether this PR works with that. Maintainability by the community: We don't know h2o. So even understanding some of what you're talking about and options available here is unclear. What happens in a few months if you're not helping maintain this and we need a committer to do so and there is a CVE or something we need to update. We've learned this lesson the hard way. For the above reasons I think you should just avoid putting these h2o specific items into apache at all. Instead I think h2o should publish their nifi-h2o-nar and just tell us about it. We can help people learn it exists without having to maintain any code or deal with the unique licensing/usability concerns here. To be clear this stuff is cool. I hope folks can use it. I just dont want to sign up to spend time learning and parsing the above. This PR as-is I believe should be closed unless I'm missing something. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #773: MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
szaszm commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r419476317 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -29,132 +29,290 @@ namespace utils = org::apache::nifi::minifi::utils; -TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { - utils::ConcurrentQueue queue; - std::vector results; +namespace { + +namespace MinifiConcurrentQueueTestProducersConsumers { - std::thread producer([]() { + // Producers + + template + std::thread getSimpleProducerThread(Queue& queue) { +return std::thread([] { queue.enqueue("ba"); std::this_thread::sleep_for(std::chrono::milliseconds(3)); queue.enqueue("dum"); std::this_thread::sleep_for(std::chrono::milliseconds(3)); queue.enqueue("tss"); }); + } - std::thread consumer([, ]() { - while (results.size() < 3) { - std::string s; - if (queue.tryDequeue(s)) { - results.push_back(s); - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - } + std::thread getBlockedProducerThread(utils::ConditionConcurrentQueue& queue, std::mutex& mutex) { +return std::thread([, ] { + std::unique_lock lock(mutex); + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); }); + } - producer.join(); - consumer.join(); + // Consumers + + std::thread getSimpleTryDequeConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (queue.tryDequeue(s)) { + results.push_back(s); +} else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); -} + std::thread getSimpleConsumeConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (!queue.consume([] (const std::string& s) { results.push_back(s); })) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } + std::thread getDequeueWaitConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + std::string s; + while (queue.dequeueWait(s)) { +results.push_back(s); + } +}); + } -TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") { - utils::ConditionConcurrentQueue queue(true); - std::vector results; + std::thread getConsumeWaitConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (!queue.consumeWait([] (const std::string& s) { results.push_back(s); })) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - std::thread producer([]() { -queue.enqueue("ba"); -std::this_thread::sleep_for(std::chrono::milliseconds(3)); -queue.enqueue("dum"); -std::this_thread::sleep_for(std::chrono::milliseconds(3)); -queue.enqueue("tss"); - }); + std::thread getSpinningReaddingDequeueConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (queue.tryDequeue(s)) { + // Unique elements only + if (!std::count(results.begin(), results.end(), s)) { +results.push_back(s); + } + queue.enqueue(std::move(s)); +} else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - std::thread consumer([, ]() { -std::string s; -while (queue.dequeueWait(s)) { - results.push_back(s); -} - }); + std::thread getReaddingDequeueConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + std::string s; + while (queue.dequeueWait(s)) { +if (!std::count(results.begin(), results.end(), s)) { + results.push_back(s); +} +// The consumer is busy enqueing so noone is waiting for this ;( +queue.enqueue(std::move(s)); + } +}); + } - producer.join(); + std::thread getDequeueWaitForConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + const std::size_t max_read_attempts = 6; + std::size_t attemt_num = 0; + while (results.size() < 3 && attemt_num < max_read_attempts) { +++attemt_num; +std::string s; +if
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #773: MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
szaszm commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r419471984 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -29,132 +29,290 @@ namespace utils = org::apache::nifi::minifi::utils; -TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { - utils::ConcurrentQueue queue; - std::vector results; +namespace { + +namespace MinifiConcurrentQueueTestProducersConsumers { - std::thread producer([]() { + // Producers + + template + std::thread getSimpleProducerThread(Queue& queue) { +return std::thread([] { queue.enqueue("ba"); std::this_thread::sleep_for(std::chrono::milliseconds(3)); queue.enqueue("dum"); std::this_thread::sleep_for(std::chrono::milliseconds(3)); queue.enqueue("tss"); }); + } - std::thread consumer([, ]() { - while (results.size() < 3) { - std::string s; - if (queue.tryDequeue(s)) { - results.push_back(s); - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - } + std::thread getBlockedProducerThread(utils::ConditionConcurrentQueue& queue, std::mutex& mutex) { +return std::thread([, ] { + std::unique_lock lock(mutex); + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); }); + } - producer.join(); - consumer.join(); + // Consumers + + std::thread getSimpleTryDequeConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (queue.tryDequeue(s)) { + results.push_back(s); +} else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); -} + std::thread getSimpleConsumeConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (!queue.consume([] (const std::string& s) { results.push_back(s); })) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } + std::thread getDequeueWaitConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + std::string s; + while (queue.dequeueWait(s)) { +results.push_back(s); + } +}); + } -TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") { - utils::ConditionConcurrentQueue queue(true); - std::vector results; + std::thread getConsumeWaitConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (!queue.consumeWait([] (const std::string& s) { results.push_back(s); })) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - std::thread producer([]() { -queue.enqueue("ba"); -std::this_thread::sleep_for(std::chrono::milliseconds(3)); -queue.enqueue("dum"); -std::this_thread::sleep_for(std::chrono::milliseconds(3)); -queue.enqueue("tss"); - }); + std::thread getSpinningReaddingDequeueConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (queue.tryDequeue(s)) { + // Unique elements only + if (!std::count(results.begin(), results.end(), s)) { +results.push_back(s); + } + queue.enqueue(std::move(s)); +} else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - std::thread consumer([, ]() { -std::string s; -while (queue.dequeueWait(s)) { - results.push_back(s); -} - }); + std::thread getReaddingDequeueConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + std::string s; + while (queue.dequeueWait(s)) { +if (!std::count(results.begin(), results.end(), s)) { + results.push_back(s); +} +// The consumer is busy enqueing so noone is waiting for this ;( +queue.enqueue(std::move(s)); + } +}); + } - producer.join(); + std::thread getDequeueWaitForConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + const std::size_t max_read_attempts = 6; + std::size_t attemt_num = 0; Review comment: I didn't notice the correct one on the previous line. Also, I meant no offense by linking the
[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #773: MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
hunyadi-dev commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r419407862 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -29,132 +29,290 @@ namespace utils = org::apache::nifi::minifi::utils; -TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { - utils::ConcurrentQueue queue; - std::vector results; +namespace { + +namespace MinifiConcurrentQueueTestProducersConsumers { - std::thread producer([]() { + // Producers + + template + std::thread getSimpleProducerThread(Queue& queue) { +return std::thread([] { queue.enqueue("ba"); std::this_thread::sleep_for(std::chrono::milliseconds(3)); queue.enqueue("dum"); std::this_thread::sleep_for(std::chrono::milliseconds(3)); queue.enqueue("tss"); }); + } - std::thread consumer([, ]() { - while (results.size() < 3) { - std::string s; - if (queue.tryDequeue(s)) { - results.push_back(s); - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - } + std::thread getBlockedProducerThread(utils::ConditionConcurrentQueue& queue, std::mutex& mutex) { +return std::thread([, ] { + std::unique_lock lock(mutex); + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); }); + } - producer.join(); - consumer.join(); + // Consumers + + std::thread getSimpleTryDequeConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (queue.tryDequeue(s)) { + results.push_back(s); +} else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); -} + std::thread getSimpleConsumeConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (!queue.consume([] (const std::string& s) { results.push_back(s); })) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } + std::thread getDequeueWaitConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + std::string s; + while (queue.dequeueWait(s)) { +results.push_back(s); + } +}); + } -TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") { - utils::ConditionConcurrentQueue queue(true); - std::vector results; + std::thread getConsumeWaitConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (!queue.consumeWait([] (const std::string& s) { results.push_back(s); })) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - std::thread producer([]() { -queue.enqueue("ba"); -std::this_thread::sleep_for(std::chrono::milliseconds(3)); -queue.enqueue("dum"); -std::this_thread::sleep_for(std::chrono::milliseconds(3)); -queue.enqueue("tss"); - }); + std::thread getSpinningReaddingDequeueConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (queue.tryDequeue(s)) { + // Unique elements only + if (!std::count(results.begin(), results.end(), s)) { +results.push_back(s); + } + queue.enqueue(std::move(s)); +} else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - std::thread consumer([, ]() { -std::string s; -while (queue.dequeueWait(s)) { - results.push_back(s); -} - }); + std::thread getReaddingDequeueConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + std::string s; + while (queue.dequeueWait(s)) { +if (!std::count(results.begin(), results.end(), s)) { + results.push_back(s); +} +// The consumer is busy enqueing so noone is waiting for this ;( +queue.enqueue(std::move(s)); + } +}); + } - producer.join(); + std::thread getDequeueWaitForConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + const std::size_t max_read_attempts = 6; + std::size_t attemt_num = 0; + while (results.size() < 3 && attemt_num < max_read_attempts) { +++attemt_num; +std::string s; +
[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #773: MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
hunyadi-dev commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r419407640 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -29,132 +29,290 @@ namespace utils = org::apache::nifi::minifi::utils; -TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { - utils::ConcurrentQueue queue; - std::vector results; +namespace { + +namespace MinifiConcurrentQueueTestProducersConsumers { - std::thread producer([]() { + // Producers + + template + std::thread getSimpleProducerThread(Queue& queue) { +return std::thread([] { queue.enqueue("ba"); std::this_thread::sleep_for(std::chrono::milliseconds(3)); queue.enqueue("dum"); std::this_thread::sleep_for(std::chrono::milliseconds(3)); queue.enqueue("tss"); }); + } - std::thread consumer([, ]() { - while (results.size() < 3) { - std::string s; - if (queue.tryDequeue(s)) { - results.push_back(s); - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - } + std::thread getBlockedProducerThread(utils::ConditionConcurrentQueue& queue, std::mutex& mutex) { +return std::thread([, ] { + std::unique_lock lock(mutex); + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); }); + } - producer.join(); - consumer.join(); + // Consumers + + std::thread getSimpleTryDequeConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (queue.tryDequeue(s)) { + results.push_back(s); +} else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); -} + std::thread getSimpleConsumeConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (!queue.consume([] (const std::string& s) { results.push_back(s); })) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } + std::thread getDequeueWaitConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + std::string s; + while (queue.dequeueWait(s)) { +results.push_back(s); + } +}); + } -TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") { - utils::ConditionConcurrentQueue queue(true); - std::vector results; + std::thread getConsumeWaitConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (!queue.consumeWait([] (const std::string& s) { results.push_back(s); })) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - std::thread producer([]() { -queue.enqueue("ba"); -std::this_thread::sleep_for(std::chrono::milliseconds(3)); -queue.enqueue("dum"); -std::this_thread::sleep_for(std::chrono::milliseconds(3)); -queue.enqueue("tss"); - }); + std::thread getSpinningReaddingDequeueConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (queue.tryDequeue(s)) { + // Unique elements only + if (!std::count(results.begin(), results.end(), s)) { +results.push_back(s); + } + queue.enqueue(std::move(s)); +} else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - std::thread consumer([, ]() { -std::string s; -while (queue.dequeueWait(s)) { - results.push_back(s); -} - }); + std::thread getReaddingDequeueConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + std::string s; + while (queue.dequeueWait(s)) { +if (!std::count(results.begin(), results.end(), s)) { + results.push_back(s); +} +// The consumer is busy enqueing so noone is waiting for this ;( +queue.enqueue(std::move(s)); + } +}); + } - producer.join(); + std::thread getDequeueWaitForConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + const std::size_t max_read_attempts = 6; + std::size_t attemt_num = 0; Review comment: Corrected to follow the spelling of line 138 :)
[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #773: MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
hunyadi-dev commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r419407398 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -127,33 +184,58 @@ class ConditionConcurrentQueue : private ConcurrentQueue { using ConcurrentQueue::empty; using ConcurrentQueue::clear; - template void enqueue(Args&&... args) { ConcurrentQueue::enqueue(std::forward(args)...); if (running_) { cv_.notify_one(); } } - + bool dequeueWait(T& out) { std::unique_lock lck(this->mtx_); -cv_.wait(lck, [this, ]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped -return running_ && ConcurrentQueue::tryDequeueImpl(lck, out); +if (!running_) { + return false; +} +cv_.wait(lck, [this, ]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped +return ConcurrentQueue::tryDequeueImpl(lck, out); + } + + template + bool consumeWait(Functor&& fun) { +std::unique_lock lck(this->mtx_); +if (!running_) { Review comment: Removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (NIFI-7421) Support SASL/OAUTHBEARER in Kafka processors
Bryan Bende created NIFI-7421: - Summary: Support SASL/OAUTHBEARER in Kafka processors Key: NIFI-7421 URL: https://issues.apache.org/jira/browse/NIFI-7421 Project: Apache NiFi Issue Type: Improvement Reporter: Bryan Bende Add support for SASL/OAUTHBEARER to the Kafka 2.x processors which now have an allowable values list of SASL mechanisms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #773: MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
hunyadi-dev commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r419406615 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -21,14 +21,49 @@ #include #include #include +#include #include +#include namespace org { namespace apache { namespace nifi { namespace minifi { namespace utils { +namespace detail { Review comment: Extracted both functions to their according header, updated `StringUtils` to include `void_t` from the external definition. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #773: MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
hunyadi-dev commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r419379889 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -29,132 +29,290 @@ namespace utils = org::apache::nifi::minifi::utils; -TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { - utils::ConcurrentQueue queue; - std::vector results; +namespace { + +namespace MinifiConcurrentQueueTestProducersConsumers { - std::thread producer([]() { + // Producers + + template + std::thread getSimpleProducerThread(Queue& queue) { +return std::thread([] { queue.enqueue("ba"); std::this_thread::sleep_for(std::chrono::milliseconds(3)); queue.enqueue("dum"); std::this_thread::sleep_for(std::chrono::milliseconds(3)); queue.enqueue("tss"); }); + } - std::thread consumer([, ]() { - while (results.size() < 3) { - std::string s; - if (queue.tryDequeue(s)) { - results.push_back(s); - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - } + std::thread getBlockedProducerThread(utils::ConditionConcurrentQueue& queue, std::mutex& mutex) { +return std::thread([, ] { + std::unique_lock lock(mutex); + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); }); + } - producer.join(); - consumer.join(); + // Consumers + + std::thread getSimpleTryDequeConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (queue.tryDequeue(s)) { + results.push_back(s); +} else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); -} + std::thread getSimpleConsumeConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (!queue.consume([] (const std::string& s) { results.push_back(s); })) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } + std::thread getDequeueWaitConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + std::string s; + while (queue.dequeueWait(s)) { +results.push_back(s); + } +}); + } -TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") { - utils::ConditionConcurrentQueue queue(true); - std::vector results; + std::thread getConsumeWaitConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (!queue.consumeWait([] (const std::string& s) { results.push_back(s); })) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - std::thread producer([]() { -queue.enqueue("ba"); -std::this_thread::sleep_for(std::chrono::milliseconds(3)); -queue.enqueue("dum"); -std::this_thread::sleep_for(std::chrono::milliseconds(3)); -queue.enqueue("tss"); - }); + std::thread getSpinningReaddingDequeueConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (queue.tryDequeue(s)) { + // Unique elements only + if (!std::count(results.begin(), results.end(), s)) { +results.push_back(s); + } + queue.enqueue(std::move(s)); +} else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - std::thread consumer([, ]() { -std::string s; -while (queue.dequeueWait(s)) { - results.push_back(s); -} - }); + std::thread getReaddingDequeueConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + std::string s; + while (queue.dequeueWait(s)) { +if (!std::count(results.begin(), results.end(), s)) { + results.push_back(s); +} +// The consumer is busy enqueing so noone is waiting for this ;( +queue.enqueue(std::move(s)); + } +}); + } - producer.join(); + std::thread getDequeueWaitForConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + const std::size_t max_read_attempts = 6; + std::size_t attemt_num = 0; + while (results.size() < 3 && attemt_num < max_read_attempts) { +++attemt_num; +std::string s; +
[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #773: MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
hunyadi-dev commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r419379889 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -29,132 +29,290 @@ namespace utils = org::apache::nifi::minifi::utils; -TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { - utils::ConcurrentQueue queue; - std::vector results; +namespace { + +namespace MinifiConcurrentQueueTestProducersConsumers { - std::thread producer([]() { + // Producers + + template + std::thread getSimpleProducerThread(Queue& queue) { +return std::thread([] { queue.enqueue("ba"); std::this_thread::sleep_for(std::chrono::milliseconds(3)); queue.enqueue("dum"); std::this_thread::sleep_for(std::chrono::milliseconds(3)); queue.enqueue("tss"); }); + } - std::thread consumer([, ]() { - while (results.size() < 3) { - std::string s; - if (queue.tryDequeue(s)) { - results.push_back(s); - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - } + std::thread getBlockedProducerThread(utils::ConditionConcurrentQueue& queue, std::mutex& mutex) { +return std::thread([, ] { + std::unique_lock lock(mutex); + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); }); + } - producer.join(); - consumer.join(); + // Consumers + + std::thread getSimpleTryDequeConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (queue.tryDequeue(s)) { + results.push_back(s); +} else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); -} + std::thread getSimpleConsumeConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (!queue.consume([] (const std::string& s) { results.push_back(s); })) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } + std::thread getDequeueWaitConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + std::string s; + while (queue.dequeueWait(s)) { +results.push_back(s); + } +}); + } -TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") { - utils::ConditionConcurrentQueue queue(true); - std::vector results; + std::thread getConsumeWaitConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (!queue.consumeWait([] (const std::string& s) { results.push_back(s); })) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - std::thread producer([]() { -queue.enqueue("ba"); -std::this_thread::sleep_for(std::chrono::milliseconds(3)); -queue.enqueue("dum"); -std::this_thread::sleep_for(std::chrono::milliseconds(3)); -queue.enqueue("tss"); - }); + std::thread getSpinningReaddingDequeueConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (queue.tryDequeue(s)) { + // Unique elements only + if (!std::count(results.begin(), results.end(), s)) { +results.push_back(s); + } + queue.enqueue(std::move(s)); +} else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - std::thread consumer([, ]() { -std::string s; -while (queue.dequeueWait(s)) { - results.push_back(s); -} - }); + std::thread getReaddingDequeueConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + std::string s; + while (queue.dequeueWait(s)) { +if (!std::count(results.begin(), results.end(), s)) { + results.push_back(s); +} +// The consumer is busy enqueing so noone is waiting for this ;( +queue.enqueue(std::move(s)); + } +}); + } - producer.join(); + std::thread getDequeueWaitForConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + const std::size_t max_read_attempts = 6; + std::size_t attemt_num = 0; + while (results.size() < 3 && attemt_num < max_read_attempts) { +++attemt_num; +std::string s; +
[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #773: MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
hunyadi-dev commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r419378919 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -127,33 +184,58 @@ class ConditionConcurrentQueue : private ConcurrentQueue { using ConcurrentQueue::empty; using ConcurrentQueue::clear; - template void enqueue(Args&&... args) { ConcurrentQueue::enqueue(std::forward(args)...); if (running_) { cv_.notify_one(); } } - + bool dequeueWait(T& out) { std::unique_lock lck(this->mtx_); -cv_.wait(lck, [this, ]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped -return running_ && ConcurrentQueue::tryDequeueImpl(lck, out); +if (!running_) { + return false; +} +cv_.wait(lck, [this, ]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped +return ConcurrentQueue::tryDequeueImpl(lck, out); + } + + template + bool consumeWait(Functor&& fun) { +std::unique_lock lck(this->mtx_); +if (!running_) { Review comment: I felt like refraining from waiting on the lock if the queue is stopped. This is redundant though and will be removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #773: MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
hunyadi-dev commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r419377662 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -29,132 +29,290 @@ namespace utils = org::apache::nifi::minifi::utils; -TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { - utils::ConcurrentQueue queue; - std::vector results; +namespace { + +namespace MinifiConcurrentQueueTestProducersConsumers { - std::thread producer([]() { + // Producers + + template + std::thread getSimpleProducerThread(Queue& queue) { +return std::thread([] { queue.enqueue("ba"); std::this_thread::sleep_for(std::chrono::milliseconds(3)); queue.enqueue("dum"); std::this_thread::sleep_for(std::chrono::milliseconds(3)); queue.enqueue("tss"); }); + } - std::thread consumer([, ]() { - while (results.size() < 3) { - std::string s; - if (queue.tryDequeue(s)) { - results.push_back(s); - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - } + std::thread getBlockedProducerThread(utils::ConditionConcurrentQueue& queue, std::mutex& mutex) { +return std::thread([, ] { + std::unique_lock lock(mutex); + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); }); + } - producer.join(); - consumer.join(); + // Consumers + + std::thread getSimpleTryDequeConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (queue.tryDequeue(s)) { + results.push_back(s); +} else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); -} + std::thread getSimpleConsumeConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (!queue.consume([] (const std::string& s) { results.push_back(s); })) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } + std::thread getDequeueWaitConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + std::string s; + while (queue.dequeueWait(s)) { +results.push_back(s); + } +}); + } -TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") { - utils::ConditionConcurrentQueue queue(true); - std::vector results; + std::thread getConsumeWaitConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (!queue.consumeWait([] (const std::string& s) { results.push_back(s); })) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - std::thread producer([]() { -queue.enqueue("ba"); -std::this_thread::sleep_for(std::chrono::milliseconds(3)); -queue.enqueue("dum"); -std::this_thread::sleep_for(std::chrono::milliseconds(3)); -queue.enqueue("tss"); - }); + std::thread getSpinningReaddingDequeueConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (queue.tryDequeue(s)) { + // Unique elements only + if (!std::count(results.begin(), results.end(), s)) { +results.push_back(s); + } + queue.enqueue(std::move(s)); +} else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - std::thread consumer([, ]() { -std::string s; -while (queue.dequeueWait(s)) { - results.push_back(s); -} - }); + std::thread getReaddingDequeueConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + std::string s; + while (queue.dequeueWait(s)) { +if (!std::count(results.begin(), results.end(), s)) { + results.push_back(s); +} +// The consumer is busy enqueing so noone is waiting for this ;( +queue.enqueue(std::move(s)); + } +}); + } - producer.join(); + std::thread getDequeueWaitForConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + const std::size_t max_read_attempts = 6; + std::size_t attemt_num = 0; Review comment: Thanks for linking the dictionary :)
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #773: MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
szaszm commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r419356467 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -21,14 +21,49 @@ #include #include #include +#include #include +#include namespace org { namespace apache { namespace nifi { namespace minifi { namespace utils { +namespace detail { Review comment: +1 for extracting the `void_t` implementation but -1 for `TryMoveCall` (at least until we need it elsewhere). If `void_t` is extracted, please adapt the partial detection idiom impl above `StringUtils::join_pack` to use the extracted one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #773: MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
szaszm commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r418353449 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -127,33 +147,58 @@ class ConditionConcurrentQueue : private ConcurrentQueue { using ConcurrentQueue::empty; using ConcurrentQueue::clear; - template void enqueue(Args&&... args) { ConcurrentQueue::enqueue(std::forward(args)...); if (running_) { cv_.notify_one(); } } - + bool dequeueWait(T& out) { +if (!running_) { + return false; +} std::unique_lock lck(this->mtx_); -cv_.wait(lck, [this, ]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped -return running_ && ConcurrentQueue::tryDequeueImpl(lck, out); +cv_.wait(lck, [this, ]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped +return ConcurrentQueue::tryDequeueImpl(lck, out); + } + + template + bool dequeueApplyWait(Functor&& fun) { +if (!running_) { + return false; +} +std::unique_lock lck(this->mtx_); +cv_.wait(lck, [this, ]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped +return ConcurrentQueue::dequeueApplyImpl(lck, std::forward(fun)); } template< class Rep, class Period > bool dequeueWaitFor(T& out, const std::chrono::duration& time) { +if (!running_) { + return false; +} std::unique_lock lck(this->mtx_); cv_.wait_for(lck, time, [this, ]{ return !running_ || !this->emptyImpl(lck); }); // Wake up with timeout or in case there is something to do -return running_ && ConcurrentQueue::tryDequeueImpl(lck, out); +return ConcurrentQueue::tryDequeueImpl(lck, out); Review comment: The checks are now inconsistent between the dequeue and consume functions. ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -29,132 +29,290 @@ namespace utils = org::apache::nifi::minifi::utils; -TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { - utils::ConcurrentQueue queue; - std::vector results; +namespace { + +namespace MinifiConcurrentQueueTestProducersConsumers { - std::thread producer([]() { + // Producers + + template + std::thread getSimpleProducerThread(Queue& queue) { +return std::thread([] { queue.enqueue("ba"); std::this_thread::sleep_for(std::chrono::milliseconds(3)); queue.enqueue("dum"); std::this_thread::sleep_for(std::chrono::milliseconds(3)); queue.enqueue("tss"); }); + } - std::thread consumer([, ]() { - while (results.size() < 3) { - std::string s; - if (queue.tryDequeue(s)) { - results.push_back(s); - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - } + std::thread getBlockedProducerThread(utils::ConditionConcurrentQueue& queue, std::mutex& mutex) { +return std::thread([, ] { + std::unique_lock lock(mutex); + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); }); + } - producer.join(); - consumer.join(); + // Consumers + + std::thread getSimpleTryDequeConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (queue.tryDequeue(s)) { + results.push_back(s); +} else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } - REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); -} + std::thread getSimpleConsumeConsumerThread(utils::ConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (!queue.consume([] (const std::string& s) { results.push_back(s); })) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + } +}); + } + std::thread getDequeueWaitConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + std::string s; + while (queue.dequeueWait(s)) { +results.push_back(s); + } +}); + } -TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") { - utils::ConditionConcurrentQueue queue(true); - std::vector results; + std::thread getConsumeWaitConsumerThread(utils::ConditionConcurrentQueue& queue, std::vector& results) { +return std::thread([, ] { + while (results.size() < 3) { +std::string s; +if (!queue.consumeWait([] (const std::string& s) { results.push_back(s); })) { +
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #775: MINIFICPP-965 Add other supported relationships to InvokeHTTP processor.
arpadboda commented on a change in pull request #775: URL: https://github.com/apache/nifi-minifi-cpp/pull/775#discussion_r419326908 ## File path: extensions/http-curl/client/HTTPClient.cpp ## @@ -296,12 +296,13 @@ bool HTTPClient::submit() { } curl_easy_setopt(http_session_, CURLOPT_HEADERFUNCTION, ::HTTPHeaderResponse::receive_headers); curl_easy_setopt(http_session_, CURLOPT_HEADERDATA, static_cast(_response_)); - if (keep_alive_probe_ > 0){ -logger_->log_debug("Setting keep alive to %d",keep_alive_probe_); + if (keep_alive_probe_.count() > 0) { +const auto keepAlive = std::chrono::duration_cast>(keep_alive_probe_); Review comment: I think casting to std::chrono::seconds (which is actually duration) is a bit more talkative. ## File path: extensions/http-curl/client/HTTPClient.h ## @@ -124,11 +135,11 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable { bool setMinimumSSLVersion(SSLVersion minimum_version) override; - void setKeepAliveProbe(long probe){ + void setKeepAliveProbe(std::chrono::milliseconds probe){ keep_alive_probe_ = probe; } - void setKeepAliveIdle(long idle){ Review comment: Could we keep this as deprecated functions as well? ## File path: extensions/http-curl/tests/HTTPHandlers.h ## @@ -445,4 +445,49 @@ class HeartbeatHandler : public CivetHandler { bool isSecure; }; +class InvokeHTTPResponseOKHandler : public CivetHandler { +public: + bool handlePost(CivetServer *, struct mg_connection *conn) { +std::stringstream headers; +headers << "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"; Review comment: I don't think stringstream makes sense in case we only put one string to it. Can we simply pass the hardcoded cstr to the mg_printf? ## File path: extensions/http-curl/tests/HTTPHandlers.h ## @@ -445,4 +445,49 @@ class HeartbeatHandler : public CivetHandler { bool isSecure; }; +class InvokeHTTPResponseOKHandler : public CivetHandler { +public: + bool handlePost(CivetServer *, struct mg_connection *conn) { +std::stringstream headers; +headers << "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"; +mg_printf(conn, headers.str().c_str()); +return true; + } +}; + +class InvokeHTTPResponse404Handler : public CivetHandler { +public: + bool handlePost(CivetServer *, struct mg_connection *conn) { +std::stringstream headers; +headers << "HTTP/1.1 404 Not Found\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"; +mg_printf(conn, headers.str().c_str()); +return true; + } +}; + +class InvokeHTTPResponse501Handler : public CivetHandler { +public: + bool handlePost(CivetServer *, struct mg_connection *conn) { +std::stringstream headers; +headers << "HTTP/1.1 501 Not Implemented\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"; +mg_printf(conn, headers.str().c_str()); +return true; + } +}; + +class InvokeHTTPResponseTimeoutHandler : public CivetHandler { +public: +InvokeHTTPResponseTimeoutHandler(std::chrono::milliseconds wait_ms) +: wait_(wait_ms) { +} + bool handlePost(CivetServer *, struct mg_connection *conn) { +std::this_thread::sleep_for(wait_); Review comment: Nice solution! ## File path: libminifi/test/resources/TestInvokeHTTPPost.yml ## @@ -0,0 +1,161 @@ +MiNiFi Config Version: 3 +Flow Controller: + name: c++lw + comment: Created by MiNiFi C2 Flow Designer +Core Properties: + flow controller graceful shutdown period: 10 sec + flow service write delay interval: 500 ms + administrative yield duration: 30 sec + bored yield duration: 10 millis + max concurrent threads: 1 + variable registry properties: '' +FlowFile Repository: + partitions: 256 + checkpoint interval: 2 mins + always sync: false + Swap: +threshold: 2 +in period: 5 sec +in threads: 1 +out period: 5 sec +out threads: 4 +Content Repository: + content claim max appendable size: 10 MB + content claim max flow files: 100 + always sync: false +Provenance Repository: + provenance rollover time: 1 min + implementation: org.apache.nifi.provenance.MiNiFiPersistentProvenanceRepository +Component Status Repository: + buffer size: 1440 + snapshot frequency: 1 min +Security Properties: + keystore: '' + keystore type: '' + keystore password: '' + key password: '' + truststore: '' + truststore type: '' + truststore password: '' + ssl protocol: '' + Sensitive Props: +key: +algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL +provider: BC +Processors: +- id: 4ed2d51d-076a-49b0-88de-5cf5adf52a7e + name: GenerateFlowFile + class: org.apache.nifi.minifi.processors.GenerateFlowFile + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period:
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #773: MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
arpadboda commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r419306193 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -127,33 +184,58 @@ class ConditionConcurrentQueue : private ConcurrentQueue { using ConcurrentQueue::empty; using ConcurrentQueue::clear; - template void enqueue(Args&&... args) { ConcurrentQueue::enqueue(std::forward(args)...); if (running_) { cv_.notify_one(); } } - + bool dequeueWait(T& out) { std::unique_lock lck(this->mtx_); -cv_.wait(lck, [this, ]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped -return running_ && ConcurrentQueue::tryDequeueImpl(lck, out); +if (!running_) { + return false; +} +cv_.wait(lck, [this, ]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped +return ConcurrentQueue::tryDequeueImpl(lck, out); + } + + template + bool consumeWait(Functor&& fun) { +std::unique_lock lck(this->mtx_); +if (!running_) { Review comment: I wonder why this is any better than the original solutions. It's 3 lines longer, but for what? The original code didn't start waiting for the CV as the predicate was true at the beginning, so immediately returned false, as we this does. Could you explain the rational behind this change? ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -21,14 +21,49 @@ #include #include #include +#include #include +#include namespace org { namespace apache { namespace nifi { namespace minifi { namespace utils { +namespace detail { Review comment: This is good stuff, but I think it should be in GeneralUtils.h as this is not a concurrentQueue-specific functionality. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #773: MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
arpadboda commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r419306193 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -127,33 +184,58 @@ class ConditionConcurrentQueue : private ConcurrentQueue { using ConcurrentQueue::empty; using ConcurrentQueue::clear; - template void enqueue(Args&&... args) { ConcurrentQueue::enqueue(std::forward(args)...); if (running_) { cv_.notify_one(); } } - + bool dequeueWait(T& out) { std::unique_lock lck(this->mtx_); -cv_.wait(lck, [this, ]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped -return running_ && ConcurrentQueue::tryDequeueImpl(lck, out); +if (!running_) { + return false; +} +cv_.wait(lck, [this, ]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped +return ConcurrentQueue::tryDequeueImpl(lck, out); + } + + template + bool consumeWait(Functor&& fun) { +std::unique_lock lck(this->mtx_); +if (!running_) { Review comment: I wonder why this is any better than the original solutions. It's 3 lines longer, but for what? The original code didn't start waiting for the CV as the predicate was true at the beginning, so immediately returned false, as this does. Could you explain the rational behind this change? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (NIFI-6422) Handling of field type in BigQuery table schema definition is incorrect
[ https://issues.apache.org/jira/browse/NIFI-6422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pierre Villard updated NIFI-6422: - Status: Patch Available (was: Open) > Handling of field type in BigQuery table schema definition is incorrect > --- > > Key: NIFI-6422 > URL: https://issues.apache.org/jira/browse/NIFI-6422 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.9.2 >Reporter: Nicolas Delsaux >Priority: Major > Labels: easyfix, pull-request-available > Time Spent: 3h 50m > Remaining Estimate: 0h > > When setting the field type to "record" in BigQuery schema definition, the > following exception is thrown > {{}}{{2019-07-03 08:35:24,964 ERROR [Timer-Driven Process Thread-8] > o.a.n.p.gcp.bigquery.PutBigQueryBatch > PutBigQueryBatch[id=b2b1c6bf-016b-1000-e8c9-b3f9fb5b417e] null: > java.lang.NullPointerException}} > {{java.lang.NullPointerException: null}} > {{ at > org.apache.nifi.processors.gcp.bigquery.BigQueryUtils.mapToField(BigQueryUtils.java:42)}} > {{ at > org.apache.nifi.processors.gcp.bigquery.BigQueryUtils.listToFields(BigQueryUtils.java:68)}} > {{ at > org.apache.nifi.processors.gcp.bigquery.BigQueryUtils.schemaFromString(BigQueryUtils.java:80)}} > {{ at > org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch.onTrigger(PutBigQueryBatch.java:277)}} > {{ at > org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)}} > {{ at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)}} > {{ at > org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)}} > {{ at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)}} > {{ at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)}} > {{ at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}} > {{ at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)}} > {{ at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)}} > {{ at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)}} > {{ at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)}} > {{ at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)}} > {{ at java.lang.Thread.run(Thread.java:748)}} > > This seems to happen because the type handling is ... not so good > > Beside, it seems like the TYPE case is not handled correctly, and RECORD type > is absolutly not handled. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #754: MINIFICPP-1151 fix most 1st party compiler warnings
arpadboda commented on a change in pull request #754: URL: https://github.com/apache/nifi-minifi-cpp/pull/754#discussion_r419275496 ## File path: extensions/http-curl/tests/HTTPHandlers.h ## @@ -15,34 +15,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#ifndef LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ +#define LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ + #include "civetweb.h" #include "CivetServer.h" #include "concurrentqueue.h" #include "CivetStream.h" #include "io/CRCStream.h" #include "rapidjson/document.h" +#include +#include -#ifndef LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ -#define LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ static std::atomic transaction_id; static std::atomic transaction_id_output; class FlowObj { public: - FlowObj() - : total_size(0) { + FlowObj() = default; - } - explicit FlowObj(const FlowObj &) - : total_size(std::move(other.total_size)), + FlowObj(FlowObj &) noexcept + : total_size(other.total_size), attributes(std::move(other.attributes)), -data(std::move(other.data)) { +data(std::move(other.data)) + { } - } - uint64_t total_size; + uint64_t total_size{0}; Review comment: Okay, this makes sense, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] davidvoit commented on a change in pull request #4175: NIFI-7307 - Add User Metadata support to the Azure Blob Processors + LookupService
davidvoit commented on a change in pull request #4175: URL: https://github.com/apache/nifi/pull/4175#discussion_r419265483 ## File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java ## @@ -91,6 +124,15 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } try { +if (!userMetadata.isEmpty()) { +blob.setMetadata(userMetadata); + +StringBuilder userMetaBldr = new StringBuilder(); +for (String userKey : userMetadata.keySet()) { + userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey)); +} Review comment: This should be fixed now. (only for the new code, if this is fine I will create a new Pull Request for s3) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] davidvoit commented on a change in pull request #4175: NIFI-7307 - Add User Metadata support to the Azure Blob Processors + LookupService
davidvoit commented on a change in pull request #4175: URL: https://github.com/apache/nifi/pull/4175#discussion_r419248566 ## File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java ## @@ -49,10 +54,38 @@ @SeeAlso({ ListAzureBlobStorage.class, PutAzureBlobStorage.class, DeleteAzureBlobStorage.class }) @InputRequirement(Requirement.INPUT_REQUIRED) @WritesAttributes({ -@WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched") +@WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched"), +@WritesAttribute(attribute = "azure.user.metadata.___", description = "If 'Write User Metadata' is set to 'True', the user defined metadata associated to the Blob object that is being listed " + +"will be written as part of the flowfile attributes") }) public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { +protected static final PropertyDescriptor WRITE_USER_METADATA = new PropertyDescriptor.Builder() Review comment: Did move it to AzureStorageUtils. I think this Utils class is there because List and Fetch are not extended from the same base class. But maybe Blob should be moved too? Anyway not something for this Pull request. Is this fine? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org