[GitHub] nifi-registry issue #1: NIFIREG-1 Initial project structure for NiFi Registr...
Github user bbende commented on the issue: https://github.com/apache/nifi-registry/pull/1 This project structure is a work in progress to begin development and is expected to continue evolving as the project matures, but this should give enough for others to start collaborating. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-registry pull request #1: NIFIREG-1 Initial project structure for NiFi ...
GitHub user bbende opened a pull request: https://github.com/apache/nifi-registry/pull/1 NIFIREG-1 Initial project structure for NiFi Registry You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi-registry NIFIREG-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-registry/pull/1.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1 commit f5753966b58a4e1045eded8ffb36a3923fa0a31b Author: Bryan Bende <bbe...@apache.org> Date: 2017-03-16T17:45:16Z NIFIREG-1 Initial project structure for NiFi Registry --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #608: NIFI-2171 Removing list of groups from User
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/608 NIFI-2171 Removing list of groups from User - Making FileAuthorizer not update the resource or action when updating an AccessPolicy - Adding corresponding READ policies during initial seeding and legacy conversions - Adding checks to FileAuthorizer to ensure only one policy per resource-action - Removing merging of policies on legacy conversion since we have one action per policy now You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-2171 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/608.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #608 commit 91a32613be4f1f62e663a21c91763b3446fcbdf5 Author: Bryan Bende <bbe...@apache.org> Date: 2016-07-05T20:16:08Z NIFI-2171 Removing list of groups from User - Making FileAuthorizer not update the resource or action when updating an AccessPolicy - Adding corresponding READ policies during initial seeding and legacy conversions - Adding checks to FileAuthorizer to ensure only one policy per resource-action - Removing merging of policies on legacy conversion since we have one action per policy now --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #581: NIFI-1916 Improvements to FileAuthorizer to not pars...
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/581 NIFI-1916 Improvements to FileAuthorizer to not parse flow when unnce⦠â¦ssary and to recreate missing authorizations.xml You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-1916-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/581.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #581 commit 9aa1e71cdd8e93d4acc3a04e6f62c1aaf52ac22e Author: Bryan Bende <bbe...@apache.org> Date: 2016-06-24T21:04:44Z NIFI-1916 Improvements to FileAuthorizer to not parse flow when unncessary and to recreate missing authorizations.xml --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #579: NIFI-1554: Changing process for cluster detection
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/579 +1 verified build passes contrib-check and was able to get into a secured NiFi instance --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #579: NIFI-1554: Changing process for cluster detection
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/579 Reviewing... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #567: NIFI-1554: Continuing to incorporate component based author...
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/567 +1 looks good, verified full build passes with contrib-check and the new integration test pass, ran the app a bit and verified basic functionality --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #567: NIFI-1554: Continuing to incorporate component based author...
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/567 Will start reviewing... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #566: NIFI-2061 Ensure nodes in a cluster use consistent a...
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/566 NIFI-2061 Ensure nodes in a cluster use consistent authorizations - Added methods to AbstractPolicyBasedAuthorizer to generate and inherit a fingerprint - Updated StandardFlowSynchronizer to compare authorization fingerprints You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-2061 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/566.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #566 commit 54a056c9132207eb40217ddc191f2ca866f45939 Author: Bryan Bende <bbe...@apache.org> Date: 2016-06-20T20:58:42Z NIFI-2061 Added methods to AbstractPolicyBasedAuthorizer to generate and inherit a fingerprint - Updated StandardFlowSynchronizer to compare authorization fingerprints --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #556: NIFI-615 - Create a processor to extract WAV file ch...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/556#discussion_r68075384 --- Diff: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/media/ExtractMediaMetadata.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.media; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.ObjectHolder; + +import org.apache.tika.exception.TikaException; +import org.apache.tika.io.TikaInputStream; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.parser.AutoDetectParser; +import org.apache.tika.sax.BodyContentHandler; +import org.xml.sax.SAXException; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"media", "file", "format", "metadata", "audio", "video", "image", "document", "pdf"}) +@CapabilityDescription("Extract the content metadata from flowfiles containing audio, video, image, and other file " ++ "types. This processor relies on the Apache Tika project for file format detection and parsing. It " ++ "extracts a long list of metadata types for media files including audio, video, and print media " ++ "formats." ++ "For the more details and the list of supported file types, visit the library's website " ++ "at http://tika.apache.org/.;) +@WritesAttributes({@WritesAttribute(attribute = ".", description = "The extracted content metadata " ++ "will be inserted with the attribute name \".\", or \"\" if " ++ "\"Metadata Key Prefix\" is not provided.")}) +@SupportsBatching +public class ExtractMediaMetadata extends AbstractProcessor { + +static final PropertyDescriptor MAX_NUMBER_OF_ATTRIBUTES = new PropertyDescriptor.Builder() +.name("Max Number of Attributes") +.description("Specify the max number of attributes to add to the flowfile. There is no guarantee in what order" ++ " the tags will be processed. By default it will process all of them.") +.
[GitHub] nifi pull request #543: NIFI-1834 Create PutTCP Processor
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/543#discussion_r67779821 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; +import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StopWatch; + +import javax.net.ssl.SSLContext; + +/** + * + * The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP + * connection (or pool of TCP connections if multiple input threads are configured). To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can + * be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specified to change the + * behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile is received and closed after the FlowFile has been sent. This option should + * only be used for low message volume scenarios, otherwise the platform may run out of TCP sockets. + * + * + * + * This processor has the following required properties: + * + * Hostname - The IP address or host name of the destination TCP server. + * Port - The TCP port of the destination TCP server. + * + * + * + * + * This processor has the following optional properties: + * + * Connection Per FlowFile - Specifies that each FlowFiles content will be transmitted on a separate TCP connection. + * Idle Connection Expiration - The time threshold after which a TCP sender is deemed eligible for pruning - the associated TCP connection will be closed after this timeout. + * Max Size of Socket Send Buffer - The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should + * be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped. + * Outgoing Message Delimiter - A string to append to the end of each FlowFiles content to indicate the end of the message to the TCP server. + * Timeout - The timeout period for determining an error has occurred whilst connecting or sending data. + * + * + * + * + * The following relationships are required: + *
[GitHub] nifi issue #543: NIFI-1834 Create PutTCP Processor
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/543 Looks good to me +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #513: PutHBaseJSON processor treats all values as Strings
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/513 @rtempleton just merged this work as part of PR 542, thanks for the contribution! can we close this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #542: NIFI-1895 for 0.x
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/542#discussion_r67718748 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java --- @@ -89,6 +89,25 @@ .defaultValue(COMPLEX_FIELD_TEXT.getValue()) .build(); +protected static final String STRING_ENCODING_VALUE = "String"; +protected static final String BYTES_ENCODING_VALUE = "Bytes"; + +protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, +"Stores the value of each field as a UTF-8 String."); --- End diff -- The existing PutHBase always produced UTF-8 so I think we are ok there. I couldn't come up with great names for these choices so I would be open to something else if there was a better name, but I think if people read the descriptions of the values it will be clear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #543: NIFI-1834 Create PutTCP Processor
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/543#discussion_r67700294 --- Diff: nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java --- @@ -126,8 +127,9 @@ + "ensure that the FlowFile content does not contain the delimiter character to avoid errors. If it is not possible to define a delimiter " + "character that is not present in the FlowFile content then the user must use another processor to change the encoding of the data e.g. Base64 " + "encoding.") -.required(false) +.required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.defaultValue("\n") --- End diff -- I think this should be "\\n" so it shows up in the property of the processor, looks like the code already replaces "\\n" to "\n" before using the value --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #543: NIFI-1834 Create PutTCP Processor
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/543#discussion_r67698814 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java --- @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; +import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.stream.io.StreamUtils; + +/** + * + * The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP + * connection (or pool of TCP connections if multiple input threads are configured). To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can + * be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specified to change the + * behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile is received and closed after the FlowFile has been sent. This option should + * only be used for low message volume scenarios, otherwise the platform may run out of TCP sockets. + * + * + * + * This processor has the following required properties: + * + * Hostname - The IP address or host name of the destination TCP server. + * Port - The TCP port of the destination TCP server. + * + * + * + * + * This processor has the following optional properties: + * + * Connection Per FlowFile - Specifies that each FlowFiles content will be transmitted on a separate TCP connection. + * Idle Connection Expiration - The time threshold after which a TCP sender is deemed eligible for pruning - the associated TCP connection will be closed after this timeout. + * Max Size of Socket Send Buffer - The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should + * be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped. + * Outgoing Message Delimiter/b> - A string to append to the end of each FlowFiles content to indicate the end of the message to the TCP server. + * Timeout - The timeout period for determining an error has occurred whilst connecting or sending data. + * + * + * + * + * The following relationships are required: + * + * failure - Where to route FlowFiles that failed to be sent. + * success - Where to route FlowFiles after the
[GitHub] nifi pull request #543: NIFI-1834 Create PutTCP Processor
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/543#discussion_r67698292 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java --- @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; +import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.stream.io.StreamUtils; + +/** + * + * The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP + * connection (or pool of TCP connections if multiple input threads are configured). To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can + * be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specified to change the + * behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile is received and closed after the FlowFile has been sent. This option should + * only be used for low message volume scenarios, otherwise the platform may run out of TCP sockets. + * + * + * + * This processor has the following required properties: + * + * Hostname - The IP address or host name of the destination TCP server. + * Port - The TCP port of the destination TCP server. + * + * + * + * + * This processor has the following optional properties: + * + * Connection Per FlowFile - Specifies that each FlowFiles content will be transmitted on a separate TCP connection. + * Idle Connection Expiration - The time threshold after which a TCP sender is deemed eligible for pruning - the associated TCP connection will be closed after this timeout. + * Max Size of Socket Send Buffer - The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should + * be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped. + * Outgoing Message Delimiter/b> - A string to append to the end of each FlowFiles content to indicate the end of the message to the TCP server. + * Timeout - The timeout period for determining an error has occurred whilst connecting or sending data. + * + * + * + * + * The following relationships are required: + * + * failure - Where to route FlowFiles that failed to be sent. + * success - Where to route FlowFiles after the
[GitHub] nifi pull request #543: NIFI-1834 Create PutTCP Processor
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/543#discussion_r67696982 --- Diff: nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java --- @@ -119,7 +119,24 @@ .defaultValue("10 seconds") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); - +public static final PropertyDescriptor OUTGOING_MESSAGE_DELIMITER = new PropertyDescriptor.Builder() +.name("Outgoing Message Delimiter") +.description("Specifies the delimiter to use when sending messages out over the same TCP stream. The delimiter is appended to each FlowFile message " ++ "that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. Users should " ++ "ensure that the FlowFile content does not contain the delimiter character to avoid errors. If it is not possible to define a delimiter " ++ "character that is not present in the FlowFile content then the user must use another processor to change the encoding of the data e.g. Base64 " ++ "encoding.") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(true) +.build(); +public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new PropertyDescriptor.Builder() --- End diff -- I'm wondering if we need this property... the pattern in AbstractPutEvent is to have a blocking queue of connections, up to the # of concurrent tasks for the processor. At the beginning of onTrigger it tries to acquire a connection, creates a new one if none available, and at the end returns to the queue unless the queue is full then it closes. There is also the concept of idle connections which get closed if they haven't been used in a certain amount of time. So unless there is a specific use case where we would want a new connection every single time, then I would opt for the above behavior. The only thing we might have to consider is that PutSplunk called pruneIdleSenders when flowFile == null, which would happen more frequently because it had @TriggerWhenEmpty. With this processor maybe we should be calling prune based on some other condition? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #543: NIFI-1834 Create PutTCP Processor
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/543#discussion_r67697377 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java --- @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; +import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.stream.io.StreamUtils; + +/** + * + * The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP + * connection (or pool of TCP connections if multiple input threads are configured). To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can + * be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specified to change the + * behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile is received and closed after the FlowFile has been sent. This option should + * only be used for low message volume scenarios, otherwise the platform may run out of TCP sockets. + * + * + * + * This processor has the following required properties: + * + * Hostname - The IP address or host name of the destination TCP server. + * Port - The TCP port of the destination TCP server. + * + * + * + * + * This processor has the following optional properties: + * + * Connection Per FlowFile - Specifies that each FlowFiles content will be transmitted on a separate TCP connection. + * Idle Connection Expiration - The time threshold after which a TCP sender is deemed eligible for pruning - the associated TCP connection will be closed after this timeout. + * Max Size of Socket Send Buffer - The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should + * be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped. + * Outgoing Message Delimiter/b> - A string to append to the end of each FlowFiles content to indicate the end of the message to the TCP server. + * Timeout - The timeout period for determining an error has occurred whilst connecting or sending data. + * + * + * + * + * The following relationships are required: + * + * failure - Where to route FlowFiles that failed to be sent. + * success - Where to route FlowFiles after the
[GitHub] nifi issue #543: NIFI-1834 Create PutTCP Processor
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/543 I think the intent behind this processor was to send data over a connection, but not to read anything back. Essentially doing what PutSyslog and PutSplunk are doing when in TCP mode. That being said we could potentially have a header and footer, and specifying only one of them would be the same as a single delimiter. I think the idea behind the delimiter was more to separate the messages, typically using new lines, but not wanting to hard code that. Will have to review in more detail when I get a chance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #513: PutHBaseJSON processor treats all values as Strings
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/513 @rtempleton please take a look at https://github.com/apache/nifi/pull/542 I've incorporated your work there with some additional changes which I explained in the PR description. Let me know if this seems good to you, and hopefully we should be able to get this into 0.7. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #542: NIFI-1895 for 0.x
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/542 NIFI-1895 for 0.x This pull request contains @rtempleton 's work from PR 513, but rebased for the 0.x branch. In addition, there is another commit by me which adds a new property to PutHBaseJSON allowing the user to specify how to store the values. The reason for this is that we can't change the behavior for existing users who may want the processor to continue working how it was. This change makes the default behavior the same as previous versions, but allows the user to enable the change Ryan made. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-1895-0x Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/542.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #542 commit 0ca89d019d1f04fbcb08a8ac6ff2916cf094f114 Author: rtempleton <ryan_temple...@yahoo.com> Date: 2016-06-09T15:16:49Z NIFI-1895 PutHBaseJSON processor treats all values as Strings The operator will now inspect the node value to determine type and convert as such. Numeric integral - Long (assumes widest type) Numeric not integral - Double (assumes widest type) Logical - Boolean everything else (including current Complex Type logic) - String Values that represent the row key continue to be implictly treated as Strings by the processor Removed depenency on HBase utility Bytes class from the PutHBaseJSON processor. Convenience methods to encode to byte array are now wrapped by the appropriate HBaseClientService instance. commit 04285af71b3aba741b3cbf480f7ea466c84791c3 Author: Bryan Bende <bbe...@apache.org> Date: 2016-06-17T21:10:40Z NIFI-1895 Adding a property to PutHBaseJSON to allow specifying how to store the values --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #513: PutHBaseJSON processor treats all values as Strings
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/513 Reviewing and going to merge into 0.x and master if all looks good... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #518: NIFI-1901: Component based access control tests
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/518 +1 looks good, verified a full build with contrib-check passes locally --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #535: NIFI-2041: Unable to refresh Controller Services when scope...
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/535 +1 looks good --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #507: NIFI-1804 Adding ability for FileAuthorizer to automaticall...
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/507 @mcgilman good call, I pushed a new commit that addresses your comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #513: PutHBaseJSON processor treats all values as Strings
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/513 Thats a great point about leveraging any changes they make to the Bytes class... I'm now thinking, what if we added simple methods to the HBaseClientService interface [1] that wrapped the calls to Bytes? Something like: `byte[] toBytes(boolean b) byte[] toBytes(long l) byte[] toBytes(double d) byte[] toBytes(String s) ` Then HBase_1_1_2_ClientService would implement those methods and use the Bytes class from 1.1.2, and if someone implemented a 0.94 version it would be up to them to use the Bytes class from 0.94 or whatever they wanted to do. The processors never know they are dealing with the Bytes class. [1] https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #509: NIFI-1982: Use Compressed check box value.
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/509 @ijokarumawak you can close this when you get a chance since it has been merged, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #513: PutHBaseJSON processor treats all values as Strings
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/513 Ryan, thanks for submitting this PR! Is it possible that we could remove the dependency on hbase-client here? The reason being is that the hbase processor bundle purposely didn't include the hbase-client and instead used the hbase-client-service (which is where the hbase-client depndency is). This means that someone could implement an hbase-client-service for another version of hbase, such as 0.94 and the processors don't have to change at all. I think adding this dependency here would make that not possible because a specific version of hbase-client would now be bundled with the processors. Seems like the reason for the dependency was to use the Bytes class that provides the conversion, which makes total sense. I'm wondering if we could look at what the code is doing and possibly write our own util, or if there is some other utility library to do this maybe we can use that. Thoughts? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #509: NIFI-1982: Use Compressed check box value.
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/509 +1 looks good, will merge to 0.x... I am assuming this was only meant for 0.x and the same fix is in your other PR for http site-to-site --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #503: NIFI-1978: Restore RPG yield duration.
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/503 @ijokarumawak I merged to 0.x but GitHub doesn't auto-close from that branch, can you close this PR when you have a chance? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #503: NIFI-1978: Restore RPG yield duration.
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/503 +1 good find, verified the yield duration is maintained across restarts, will merge to 0.x --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #503: NIFI-1978: Restore RPG yield duration.
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/503 Reviewing... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #499: NIFI-1052: Added "Ghost" Processors, Reporting Tasks, Contr...
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/499 +1 code looks good, build passes, tested a missing processor, controller service, and reporting task and the app still starts up, awesome stuff! Will merge into master shortly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #499: NIFI-1052: Added "Ghost" Processors, Reporting Tasks, Contr...
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/499 Reviewing... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #507: NIFI-1804 Adding ability for FileAuthorizer to autom...
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/507 NIFI-1804 Adding ability for FileAuthorizer to automatically convert ⦠â¦existing authorized-users.xml to new model You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-1804 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/507.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #507 commit 3d7874ca73a92bac925c5a1aca354980a347d3c8 Author: Bryan Bende <bbe...@apache.org> Date: 2016-06-06T20:32:19Z NIFI-1804 Adding ability for FileAuthorizer to automatically convert existing authorized-users.xml to new model --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #489: NIFI-1951: Custom UIs
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/489 +1 full build passed contrib-check, ran the app and verified custom UIs still work --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #473: NIFI-1916 Updating FileAuthorizer to extend AbstractPolicyB...
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/473 @mcgilman updated the PR to address the thread-safety, a summary of the changes... - Created a single data structure to encapsulate all data structures used by the FileAuthorizer so there can be a single AtomicReference - Added synchronization to add, update, delete methods to ensure only one thread can modify the internal Authorizations reference - Included changes to expose the root group id to the authorizer which will be needed to auto-convert old users files - Brought back the old users.xsd and generation of the jaxb object to prep for auto-converting old users files --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #482: NIFI-1928: Remove registration
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/482 +1 looks good, verified registration form no longer exists and access denied page is presented instead --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #482: NIFI-1928: Remove registration
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/482 Reviewing... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #483: NIFI-1899 - Introduce ExtractEmailAttachments processor
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/483 Sorry that second line should have been something like: `split = session.putAttribute(split, "split.parent.uuid", parentUUID);` You wouldn't want to overwrite the regular UUID :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #483: NIFI-1899 - Introduce ExtractEmailAttachments processor
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/483 @trixpan if you want to add the UUDI of the original FlowFile you would get it from the attributes: `String parentUuid = originalFlowFile.getAttribute(CoreAttributes.UUID.key());` Then add that to the child FlowFile: `split = session.putAttribute(split, CoreAttributes.UUID.key(), parentUUID);` Is that what you were looking to do? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #483: NIFI-1899 - Introduce ExtractEmailAttachments processor
Github user bbende commented on the issue: https://github.com/apache/nifi/pull/483 @joewitt @trixpan I think the provenance linkage should already be happening automatically with the following line: `FlowFile split = session.create(originalFlowFile);` With that call, the framework will create a fork event from the parent for each attachment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1916 Updating FileAuthorizer to extend Abs...
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/473 NIFI-1916 Updating FileAuthorizer to extend AbstractPolicyBasedAuthor⦠â¦izer and adding intial loading of data users, groups, and policies - Implementing CRUD operations and unit tests for Users - Implementing CRUD operations and unit tests for Groups - Implementing CRUD operations and unit tests for AccessPolicies - Adding support for seeding with an initial admin user - Fixing delete for user and group so it removes references from policies - Adding example to authorizations.xml You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-1916 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/473.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #473 commit 8fd48116d89362f8256c85f0981571ae6cf9612b Author: Bryan Bende <bbe...@apache.org> Date: 2016-05-25T17:22:05Z NIFI-1916 Updating FileAuthorizer to extend AbstractPolicyBasedAuthorizer and adding intial loading of data users, groups, and policies - Implementing CRUD operations and unit tests for Users - Implementing CRUD operations and unit tests for Groups - Implementing CRUD operations and unit tests for AccessPolicies - Adding support for seeding with an initial admin user - Fixing delete for user and group so it removes references from policies - Adding example to authorizations.xml --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1907 Moving lazy init of SSLContext...
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/457#issuecomment-221268368 Going to merge this in since Joe was a +1... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1884 Defining API for Users, Groups, and P...
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/452#issuecomment-221046683 @mcgilman @jtstorck @alopresto pushed two new commits, the first contains minor changes to address some of Matt's comments, the second is an attempt at what Matt suggested about how the MutableAuthorizer could be an abstract class that provides an implementation of authorize(), let me know what you think --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1884 Defining API for Users, Groups, and P...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/452#discussion_r64076734 --- Diff: nifi-api/src/main/java/org/apache/nifi/authorization/AccessPolicy.java --- @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.authorization; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +/** + * Defines a policy for a set of entities to perform a set of actions on a given resource. + */ +public class AccessPolicy { + +private final String identifier; + +private final Resource resource; + +private final Set entities; + +private final Set actions; + +private AccessPolicy(final AccessPolicyBuilder builder) { +this.identifier = builder.identifier; +this.resource = builder.resource; + +Set entities = new HashSet<>(); +if (builder.entities != null) { +entities.addAll(builder.entities); +} +this.entities = Collections.unmodifiableSet(entities); + +Set actions = new HashSet<>(); +if (builder.actions != null) { +actions.addAll(builder.actions); +} +this.actions = Collections.unmodifiableSet(actions); + +if (this.identifier == null || this.identifier.trim().isEmpty()) { +throw new IllegalArgumentException("Identifier can not be null or empty"); +} + +if (this.resource == null) { +throw new IllegalArgumentException("Resource can not be null"); +} + +if (this.entities == null || this.entities.isEmpty()) { +throw new IllegalArgumentException("Entities can not be null or empty"); +} + +if (this.actions == null || this.actions.isEmpty()) { +throw new IllegalArgumentException("Actions can not be null or empty"); +} +} + +/** + * @return the identifier for this policy + */ +public String getIdentifier() { +return identifier; +} + +/** + * @return the resource for this policy + */ +public Resource getResource() { +return resource; +} + +/** + * @return an unmodifiable set of entity ids for this policy + */ +public Set getEntities() { +return entities; +} + +/** + * @return an unmodifiable set of actions for this policy + */ +public Set getActions() { +return actions; +} + +@Override +public boolean equals(Object obj) { +if (obj == null) { +return false; +} +if (getClass() != obj.getClass()) { +return false; +} + +final AccessPolicy other = (AccessPolicy) obj; +return Objects.equals(this.identifier, other.identifier); +} + +@Override +public int hashCode() { +return Objects.hashCode(this.identifier); +} + +@Override +public String toString() { +return String.format("identifier[%s], resource[%s], entityId[%s], action[%s]", +getIdentifier(), getResource().getIdentifier(), getEntities(), getActions(), ", "); +} + +/** + * Builder for Access Policies. + */ +public static class AccessPolicyBuilder { + +private String identifier; +private Resource resource; +private Set entities = new HashSet<>(); +private Set actions = new HashSet<>(); +private final boolean fromPolicy; + +/** + * Default constructor for building a new AccessPolic
[GitHub] nifi pull request: NIFI-1907 Moving lazy init of SSLContext...
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/457 NIFI-1907 Moving lazy init of SSLContext... to StandardSiteToSiteClientConfig rather than the builder. Tested secure SiteToSite still working between two NiFi nodes, and now example Storm topology can also connect to a secured NiFi instance. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-1907 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/457.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #457 commit 25bae0fd890b53f2f201d1e5475d5d0e1708580e Author: Bryan Bende <bbe...@apache.org> Date: 2016-05-20T15:15:36Z NIFI-1907 Moving lazy init of SSLContext to StandardSiteToSiteClientConfig rather than the builder --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1884 Defining API for Users, Groups, and P...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/452#discussion_r63760082 --- Diff: nifi-api/src/main/java/org/apache/nifi/authorization/AccessPolicy.java --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.authorization; + +import java.util.Collections; +import java.util.Objects; +import java.util.Set; + +/** + * Defines a policy for a set of entities to perform a set of actions on a given resource. + */ +public class AccessPolicy { + +private final String identifier; + +private final Resource resource; + +private final Set entities; + +private final Set actions; + +/** + * Constructs a new policy with the given resource, entities, and actions. + * + * @param identifier the identifier of the policy + * @param resource the resource for the policy + * @param entities the entity ids for the policy (i.e. user or group ids) + * @param actions the actions for the policy + */ +public AccessPolicy(final String identifier, final Resource resource, final Set entities, final Set actions) { +if (identifier == null || identifier.trim().isEmpty()) { +throw new IllegalArgumentException("Identifier can not be null or empty"); +} + +if (resource == null) { +throw new IllegalArgumentException("Resource can not be null"); +} + +if (entities == null || entities.isEmpty()) { +throw new IllegalArgumentException("Entities can not be null or empty"); +} + +if (actions == null || actions.isEmpty()) { +throw new IllegalArgumentException("Actions can not be null or empty"); +} + +this.identifier = identifier; +this.resource = resource; +this.entities = Collections.unmodifiableSet(entities); +this.actions = Collections.unmodifiableSet(actions); +} + +/** + * @return the identifier for this policy + */ +public String getIdentifier() { +return identifier; +} + +/** + * @return the resource for this policy + */ +public Resource getResource() { +return resource; +} + +/** + * @return the set of entity ids for this policy + */ +public Set getEntities() { +return entities; +} + +/** + * @return the set of actions for this policy + */ +public Set getActions() { +return actions; +} + +@Override +public boolean equals(Object obj) { +if (obj == null) { +return false; +} +if (getClass() != obj.getClass()) { +return false; +} + +final AccessPolicy other = (AccessPolicy) obj; + +return this.identifier.equals(other.getIdentifier()) +&& this.resource.getIdentifier().equals(other.getResource().getIdentifier()) +&& this.entities.equals(other.entities) +&& this.actions.equals(other.actions); +} + +@Override +public int hashCode() { +int hash = 7; +hash = 53 * hash + Objects.hash(this.identifier, this.resource, this.entities, this.actions); --- End diff -- Good call, will update all the hashCode implementations --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1884 Defining API for Users, Groups, and P...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/452#discussion_r63759950 --- Diff: nifi-api/src/main/java/org/apache/nifi/authorization/AccessPolicy.java --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.authorization; + +import java.util.Collections; +import java.util.Objects; +import java.util.Set; + +/** + * Defines a policy for a set of entities to perform a set of actions on a given resource. + */ +public class AccessPolicy { + +private final String identifier; + +private final Resource resource; + +private final Set entities; + +private final Set actions; + +/** + * Constructs a new policy with the given resource, entities, and actions. + * + * @param identifier the identifier of the policy + * @param resource the resource for the policy + * @param entities the entity ids for the policy (i.e. user or group ids) + * @param actions the actions for the policy + */ +public AccessPolicy(final String identifier, final Resource resource, final Set entities, final Set actions) { +if (identifier == null || identifier.trim().isEmpty()) { +throw new IllegalArgumentException("Identifier can not be null or empty"); +} + +if (resource == null) { +throw new IllegalArgumentException("Resource can not be null"); +} + +if (entities == null || entities.isEmpty()) { +throw new IllegalArgumentException("Entities can not be null or empty"); +} + +if (actions == null || actions.isEmpty()) { +throw new IllegalArgumentException("Actions can not be null or empty"); +} + +this.identifier = identifier; +this.resource = resource; +this.entities = Collections.unmodifiableSet(entities); +this.actions = Collections.unmodifiableSet(actions); +} + +/** + * @return the identifier for this policy + */ +public String getIdentifier() { +return identifier; +} + +/** + * @return the resource for this policy + */ +public Resource getResource() { +return resource; +} + +/** + * @return the set of entity ids for this policy + */ +public Set getEntities() { +return entities; --- End diff -- Good call, I prefer documenting in the javadoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1884 Defining API for Users, Groups, and P...
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/452#issuecomment-220121052 @alopresto The intent was for the relationship to be bi-directional, meaning you could add a user to a group by adding/updating a User with a new group id in the set of groups, or by adding/updating a Group with a new user id in the set of users. The overall idea was to retrieve an object, make a copy of it with the desired changes, and then call update. So for the example scenarios... - You could do this two different ways, but one would be to create a new Group instance with name "Admins" and no users and call addGroup(), then create a new User instance with name "Bryan Bende" and a set of group ids containing the id of the "Admins" group and call addUser() - Same as above to create the group and user and place the user in the group... then you would retrieve the User instance for "Andrew LoPresto", make a copy of that User instance with the name set to "Andy LoPrestro" and call updateUser(...) - Same as scenario 1 to create the group and users and place the users in the group, although to add both users to the group in one action you could retrieve the Group, make a copy of the Group and add both user ids to the set of users, and call updateGroup(). Removing "Bryan Bende" from "Admins" could be done by copying the Group without "Bryan Bende"s id in the set of users and calling updateGroup, or by copying the User for "Bryan Bende" without the id of the "Admin" group in the set of groups and calling updateUser(). After thinking about all of these, it seems like it would nice to have builders that made it convenient for updating a User or Group... So something like... ``` User user = authorizer.getUser(id); User updatedUser = new User.UserBuilder().fromUser(user).name("New Name").build(); authorizer.updateUser(updatedUser); ``` Also easily removing entries, so something like this for removing a user from a group: ``` User user = authorizer.getUser(id); User updatedUser = new User.UserBuilder().fromUser(user).removeGroup(groupId).build(); authorizer.updateUser(updatedUser); ``` Thoughts? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1884 Defining API for Users, Groups, and P...
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/452 NIFI-1884 Defining API for Users, Groups, and Policies This pull request introduces the concept of a MutableAuthorizer which is an interface that extends the recently introduced Authorizer. A MutableAuthorizer has the ability to manage users, groups, and policies. In addition this PR introduces the classes for user, group, and access policy. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-1884 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/452.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #452 commit 2c9510ba4a2c3b42811606528a98f4b5ed7d09ac Author: Bryan Bende <bbe...@apache.org> Date: 2016-05-16T21:11:41Z NIFI-1884 Defining API for Users, Groups, and Policies --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1858 Adding site-to-site reporting bundle
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/436#issuecomment-219738116 Since @markap14 has already reviewed this and merged it into 0.x branch, I am going to merge this into master myself so we don't lose track of this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1554: Introducing component level revision...
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/448#issuecomment-219736438 +1 Full build passes with contrib-check, was able to create a flow with controller services working as expected. Ran into an issue importing a template with a controller service, but looks like we already captured that in NIFI-1882 so should not prevent this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1742: Initial support for component level ...
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/435#issuecomment-218805610 Reviewing... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1858 Adding site-to-site reporting bundle
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/436#issuecomment-218766817 @markap14 here is the corresponding PR for merging the site-to-site reporting bundle into master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1858 Adding site-to-site reporting bundle
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/436 NIFI-1858 Adding site-to-site reporting bundle PR for merging this into master. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-1858-MASTER Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/436.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #436 commit ce13a2e02b88ab6f9a51c440c122645ba242a89f Author: Bryan Bende <bbe...@apache.org> Date: 2016-05-12T13:59:27Z NIFI-1858 Adding site-to-site reporting bundle --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1858 Adding SiteToSiteProvenanceReportingT...
Github user bbende closed the pull request at: https://github.com/apache/nifi/pull/419 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1858 Adding SiteToSiteProvenanceReportingT...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/419#discussion_r62548156 --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.reporting; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.controller.status.PortStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +@Tags({"provenance", "lineage", "tracking", "site", "site to site"}) +@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.") +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.") +public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReportingTask { + +private static final String TIMESTAMP_FORMAT = "-MM-dd'T'HH:mm:ss.SSS'Z'"; +private static final String LAST_EVENT_ID_KEY = "last_event_id"; + +static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder() +.name("Platform") +.description("The value to use for the platform field in each provenance event.") +.required(true) +.expressionLanguageSupported(true) +.defaultValue("nifi") +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +private volatile long firstEventId = -1L; + +@Override +protected List getSupportedPropertyDescriptors() { +final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); +properties.add(PLATFORM); +return properties; +} + +private String getComponentName(final ProcessGroupStatus status, final ProvenanceEventRecord event) { +if (status == null) { +return null; +} + +final String componentId = event.getComponentId(); +if (status.getId().equals(componentId)) { +return status.getName(); +} + +for (final ProcessorStatus procStatus : status.getProcessorStatus()) { +if (procStatus.getId().equals(compo
[GitHub] nifi pull request: NIFI-1858 Adding SiteToSiteProvenanceReportingT...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/419#discussion_r62505576 --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.ssl.SSLContextService; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Base class for ReportingTasks that send data over site-to-site. + */ +public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingTask { + +static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder() +.name("Destination URL") +.description("The URL to send the Provenance Events to. For example, to send to a NiFi instance running " + +"at http://localhost:8080/nifi this value should be http://localhost:8080;) +.required(true) --- End diff -- @jvwing @mosermw I agree, I will push a commit that aligns it with the way RPGs take the url, thanks for reviewing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1781: First pass through updating canvas f...
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/417#issuecomment-217541758 +1 Looks good, was able to run the app secured and unsecured to verify basic functionality, and build passes contrib-check. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-981: Added ExecuteHiveQL and PutHiveQL pro...
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/384#issuecomment-216595481 Latest changes look good, I am a +1 again :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-981: Added ExecuteHiveQL and PutHiveQL pro...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/384#discussion_r61832445 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java --- @@ -52,15 +54,27 @@ @EventDriven @InputRequirement(Requirement.INPUT_ALLOWED) @Tags({"hive", "sql", "select", "jdbc", "query", "database"}) -@CapabilityDescription("Execute provided HiveQL SELECT query against a Hive database connection. Query result will be converted to Avro format." +@CapabilityDescription("Execute provided HiveQL SELECT query against a Hive database connection. Query result will be converted to Avro or CSV format." + " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " + "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + "select query. FlowFile attribute 'executehiveql.row.count' indicates how many rows were selected.") -public class ExecuteHiveQL extends AbstractHiveQLProcessor { +@WritesAttributes({ +@WritesAttribute(attribute = "mime.type", description = "Sets the MIME type for the outgoing flowfile to application/avro-binary for Avro or text/csv for CSV."), +@WritesAttribute(attribute = "filename", description = "Adds .avro or .csv to the filename attribute depending on which output format is selected."), +@WritesAttribute(attribute = "executehiveql.row.count", description = "Indicates how many rows were selected/returned by the query.") --- End diff -- Nit-picking here, but given the rename of the processor, do we want this to be selecthiveql.row.count? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-981: Added ExecuteHiveQL and PutHiveQL pro...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/384#discussion_r61832456 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java --- @@ -52,15 +54,27 @@ @EventDriven @InputRequirement(Requirement.INPUT_ALLOWED) @Tags({"hive", "sql", "select", "jdbc", "query", "database"}) -@CapabilityDescription("Execute provided HiveQL SELECT query against a Hive database connection. Query result will be converted to Avro format." +@CapabilityDescription("Execute provided HiveQL SELECT query against a Hive database connection. Query result will be converted to Avro or CSV format." + " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " + "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + "select query. FlowFile attribute 'executehiveql.row.count' indicates how many rows were selected.") -public class ExecuteHiveQL extends AbstractHiveQLProcessor { +@WritesAttributes({ +@WritesAttribute(attribute = "mime.type", description = "Sets the MIME type for the outgoing flowfile to application/avro-binary for Avro or text/csv for CSV."), +@WritesAttribute(attribute = "filename", description = "Adds .avro or .csv to the filename attribute depending on which output format is selected."), +@WritesAttribute(attribute = "executehiveql.row.count", description = "Indicates how many rows were selected/returned by the query.") +}) +public class SelectHiveQL extends AbstractHiveQLProcessor { public static final String RESULT_ROW_COUNT = "executehiveql.row.count"; --- End diff -- same as above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-981: Added ExecuteHiveQL and PutHiveQL pro...
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/384#issuecomment-216246029 Latest commits look good, I am a +1 and going to merge to 0.x and master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1554: Introduction of access controls on c...
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/393#issuecomment-215872920 I'm a +1... reviewed and looks good, ran the application in secure and un-secure modes to verify functionality, passes contrib-check. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-981: Added ExecuteHiveQL and PutHiveQL pro...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/384#discussion_r61502555 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ExecuteHiveQL.java --- @@ -152,8 +152,23 @@ public void process(final OutputStream out) throws IOException { logger.info("{} contains {} Avro records; transferring to 'success'", new Object[]{fileToProcess, nrOfRows.get()}); -session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", -stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + +if (context.hasIncomingConnection()) { +// If the flow file came from an incoming connection, issue a Modify Content provenance event + + session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", +stopWatch.getElapsed(TimeUnit.MILLISECONDS)); +} else { +// If we created a flow file from rows received from Hive, issue a Receive provenance event +// Determine the database URL from the connection metadata +String url = "jdbc:hive2://unknown-host"; +try { +url = con.getMetaData().getURL(); --- End diff -- In your testing did con.getMetaData().getUrl() return a value? Testing on a VM I am always getting unknown-host in my provenance events. I'm wondering should we just take the value from the Database Connection URL property and use that as the URI here, instead of relying on the connection object. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-981: Added ExecuteHiveQL and PutHiveQL pro...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/384#discussion_r61343688 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ExecuteHiveQL.java --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dbcp.hive.HiveDBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.LongHolder; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.hive.HiveJdbcCommon; + +import java.io.IOException; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@EventDriven +@InputRequirement(Requirement.INPUT_ALLOWED) +@Tags({"hive", "sql", "select", "jdbc", "query", "database"}) +@CapabilityDescription("Execute provided HiveQL SELECT query against a Hive database connection. Query result will be converted to Avro format." ++ " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " ++ "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " ++ "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " ++ "select query. FlowFile attribute 'executehiveql.row.count' indicates how many rows were selected.") +public class ExecuteHiveQL extends AbstractHiveQLProcessor { + +public static final String RESULT_ROW_COUNT = "executehiveql.row.count"; + +// Relationships +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("Successfully created FlowFile from HiveQL query result set.") +.build(); +public static final Relationship REL_FAILURE = new Relationship.Builder() +.name("failure") +.description("HiveQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship") +.build(); + + +public static final PropertyDescriptor HIVEQL_SELECT_QUERY = new PropertyDescriptor.Builder() +.name("hive-query") --- End diff -- One thing I noticed was when you drag on a brand new ExecuteHiveQL processor, the error message for the query will say "hive-query is invalid", I'm wondering if this is actually a bug in the validator where it is using the name instead of display name --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-981: Added ExecuteHiveQL and PutHiveQL pro...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/384#discussion_r61316112 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ExecuteHiveQL.java --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dbcp.hive.HiveDBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.LongHolder; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.hive.HiveJdbcCommon; + +import java.io.IOException; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@EventDriven +@InputRequirement(Requirement.INPUT_ALLOWED) +@Tags({"hive", "sql", "select", "jdbc", "query", "database"}) +@CapabilityDescription("Execute provided HiveQL SELECT query against a Hive database connection. Query result will be converted to Avro format." ++ " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " ++ "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " ++ "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " ++ "select query. FlowFile attribute 'executehiveql.row.count' indicates how many rows were selected.") +public class ExecuteHiveQL extends AbstractHiveQLProcessor { + +public static final String RESULT_ROW_COUNT = "executehiveql.row.count"; + +// Relationships +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("Successfully created FlowFile from HiveQL query result set.") +.build(); +public static final Relationship REL_FAILURE = new Relationship.Builder() +.name("failure") +.description("HiveQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship") +.build(); + + +public static final PropertyDescriptor HIVEQL_SELECT_QUERY = new PropertyDescriptor.Builder() +.name("hive-query") +.displayName("HiveQL select query") +.description("HiveQL select query") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(true) +.build(); + +private final static List propertyDescriptors; +private final static Set relationships; + +/* +
[GitHub] nifi pull request: NIFI-981: Added ExecuteHiveQL and PutHiveQL pro...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/384#discussion_r61317929 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.hive; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.SchemaBuilder.FieldAssembler; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +import static java.sql.Types.ARRAY; +import static java.sql.Types.BIGINT; +import static java.sql.Types.BINARY; +import static java.sql.Types.BIT; +import static java.sql.Types.BLOB; +import static java.sql.Types.BOOLEAN; +import static java.sql.Types.CHAR; +import static java.sql.Types.CLOB; +import static java.sql.Types.DATE; +import static java.sql.Types.DECIMAL; +import static java.sql.Types.DOUBLE; +import static java.sql.Types.FLOAT; +import static java.sql.Types.INTEGER; +import static java.sql.Types.LONGNVARCHAR; +import static java.sql.Types.LONGVARBINARY; +import static java.sql.Types.LONGVARCHAR; +import static java.sql.Types.NCHAR; +import static java.sql.Types.NUMERIC; +import static java.sql.Types.NVARCHAR; +import static java.sql.Types.REAL; +import static java.sql.Types.ROWID; +import static java.sql.Types.SMALLINT; +import static java.sql.Types.TIME; +import static java.sql.Types.TIMESTAMP; +import static java.sql.Types.TINYINT; +import static java.sql.Types.VARBINARY; +import static java.sql.Types.VARCHAR; + +/** + * JDBC / HiveQL common functions. + */ +public class HiveJdbcCommon { + +public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException { +return convertToAvroStream(rs, outStream, null, null); +} + + +public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback) +throws SQLException, IOException { +final Schema schema = createSchema(rs, recordName); +final GenericRecord rec = new GenericData.Record(schema); + +final DatumWriter datumWriter = new GenericDatumWriter<>(schema); +try (final DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter)) { +dataFileWriter.create(schema, outStream); + +final ResultSetMetaData meta = rs.getMetaData(); +final int nrOfColumns = meta.getColumnCount(); +long nrOfRows = 0; +while (rs.next()) { +if (callback != null) { +callback.processRow(rs); +} +for (int i = 1; i <= nrOfColumns; i++) { +final int javaSqlType = meta.getColumnType(i); +final Object value = rs.getObject(i); + +if (value == null) { +rec.put(i - 1, null); + +} else if (javaSqlType == BINARY || javaSqlType == VARBINARY || javaSqlType == LONGVARBINARY || javaSqlType == ARRAY || javaSqlType == BLOB || javaSqlType == CLOB) { +// bytes requires little bit different handling +byte[] bytes = rs.getBytes(i); +
[GitHub] nifi pull request: NIFI-1554: Introducing new REST endpoints to al...
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/374#issuecomment-213121166 Pulled down the latest and tested again, all looks good now, app seems to function properly in standalone and clustered mode. I'm a +1 to merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1554: Introducing new REST endpoints to al...
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/374#issuecomment-213101837 I was testing a 2 node cluster locally and noticed that the part in the toolbar that says Connected Nodes didn't show anything after it, but in the cluster view it showed both nodes in the table. Not sure if this is related to this PR or not, still investigating. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1554: Introducing new REST endpoints to al...
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/374#issuecomment-213080400 Reviewing... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1296[REVIEW_ONLY] initial commit of Kafka ...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/345#discussion_r60256873 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java --- @@ -0,0 +1,254 @@ +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.FormatUtils; + +abstract class AbstractKafkaProcessor extends AbstractSessionFactoryProcessor { + +private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}"; + +private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; + + +static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT"); +static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL"); +static final AllowableValue SEC_SASL_PLAINTEXT = new AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT"); +static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", "SASL_SSL", "SASL_SSL"); + +static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder() +.name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) +.displayName("Kafka Brokers") +.description("A comma-separated list of known Kafka Brokers in the format :") +.required(true) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) +.expressionLanguageSupported(false) --- End diff -- Also wondering if we should support EL here for the same reason I mentioned in the other comment. Seems like the brokers and the topic would be the likely things to change across dev/prod environments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1296[REVIEW_ONLY] initial commit of Kafka ...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/345#discussion_r60256134 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@CapabilityDescription("Consumes messages from Apache Kafka") +@Tags({ "Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume" }) +public class ConsumeKafka extends AbstractKafkaProcessor<KafkaConsumer<byte[], byte[]>> { + +static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset"); + +static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset"); + +static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group"); + +static final PropertyDescriptor TOPIC = TOPIC_BUILDER +.expressionLanguageSupported(false) --- End diff -- Any reason why EL is not supported here, but it is supported in PublishKafka? I think we would want both to support it for people who want to externalize properties into bootstrap.conf to make their flow portable across environments. I realize on the consumer side there is no FlowFile to evaluate against, but you can still evaluate without a flow file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1551: Address issue working with Ports
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/364#issuecomment-211945609 Looks good, verified the change fixes the problem with ports, will push to master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1551: Address issue working with Ports
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/364#issuecomment-211941635 Reviewing... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1783: Load Authorizer
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/363#issuecomment-211937379 looks good, passes contrib-check, will push to master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1783: Load Authorizer
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/363#issuecomment-211931551 Reviewing... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/361#discussion_r60228292 --- Diff: nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.storm; + +import backtype.storm.Config; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Tuple; +import backtype.storm.utils.TupleUtils; +import org.apache.commons.lang3.Validate; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher + * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or + * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple + * immediately in a single transaction. + */ +public class NiFiBolt extends BaseRichBolt { + +private static final long serialVersionUID = 3067274587595578836L; +public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class); + +private final SiteToSiteClientConfig clientConfig; +private final NiFiDataPacketBuilder builder; +private final int tickFrequencySeconds; + +private SiteToSiteClient client; +private OutputCollector collector; +private BlockingQueue queue = new LinkedBlockingQueue<>(); + +private int batchSize = 10; +private int batchIntervalInSec = 10; +private long lastBatchProcessTimeSeconds = 0; + +public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) { +this.clientConfig = clientConfig; +this.builder = builder; +this.tickFrequencySeconds = tickFrequencySeconds; +Validate.notNull(this.clientConfig); +Validate.notNull(this.builder); +Validate.isTrue(this.tickFrequencySeconds > 0); +} + +public NiFiBolt withBatchSize(int batchSize) { +this.batchSize = batchSize; +Validate.isTrue(this.batchSize > 0); +return this; +} + +public NiFiBolt withBatchInterval(int batchIntervalInSec) { +this.batchIntervalInSec = batchIntervalInSec; +Validate.isTrue(this.batchIntervalInSec > 0); +return this; +} + +@Override +public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { +this.client = createSiteToSiteClient(); +this.collector = outputCollector; +this.lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000; + +LOGGER.info("Bolt is prepared with Batch Size " + batchSize ++ ", Batch Interval " + batchIntervalInSec ++ ", Tick Frequency is " + tickFrequencySeconds); +} + +protected SiteToSiteClient createSiteToSiteClient() { +return new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); +} + +@Override +public void execute(Tuple tuple) { +if (TupleUtils.isTick(tuple)) {
[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/361#discussion_r60227372 --- Diff: nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.storm; + +import backtype.storm.Config; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Tuple; +import backtype.storm.utils.TupleUtils; +import org.apache.commons.lang3.Validate; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher + * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or + * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple + * immediately in a single transaction. + */ +public class NiFiBolt extends BaseRichBolt { + +private static final long serialVersionUID = 3067274587595578836L; +public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class); + +private final SiteToSiteClientConfig clientConfig; +private final NiFiDataPacketBuilder builder; +private final int tickFrequencySeconds; + +private SiteToSiteClient client; +private OutputCollector collector; +private BlockingQueue queue = new LinkedBlockingQueue<>(); + +private int batchSize = 10; +private int batchIntervalInSec = 10; +private long lastBatchProcessTimeSeconds = 0; + +public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) { +this.clientConfig = clientConfig; +this.builder = builder; +this.tickFrequencySeconds = tickFrequencySeconds; +Validate.notNull(this.clientConfig); +Validate.notNull(this.builder); +Validate.isTrue(this.tickFrequencySeconds > 0); +} + +public NiFiBolt withBatchSize(int batchSize) { +this.batchSize = batchSize; +Validate.isTrue(this.batchSize > 0); +return this; +} + +public NiFiBolt withBatchInterval(int batchIntervalInSec) { +this.batchIntervalInSec = batchIntervalInSec; +Validate.isTrue(this.batchIntervalInSec > 0); +return this; +} + +@Override +public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { +this.client = createSiteToSiteClient(); +this.collector = outputCollector; +this.lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000; + +LOGGER.info("Bolt is prepared with Batch Size " + batchSize ++ ", Batch Interval " + batchIntervalInSec ++ ", Tick Frequency is " + tickFrequencySeconds); +} + +protected SiteToSiteClient createSiteToSiteClient() { +return new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); +} + +@Override +public void execute(Tuple tuple) { +if (TupleUtils.isTick(tuple)) {
[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/361#discussion_r60227347 --- Diff: nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.storm; + +import backtype.storm.Config; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Tuple; +import backtype.storm.utils.TupleUtils; +import org.apache.commons.lang3.Validate; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher + * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or + * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple + * immediately in a single transaction. + */ +public class NiFiBolt extends BaseRichBolt { + +private static final long serialVersionUID = 3067274587595578836L; +public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class); + +private final SiteToSiteClientConfig clientConfig; +private final NiFiDataPacketBuilder builder; +private final int tickFrequencySeconds; + +private SiteToSiteClient client; +private OutputCollector collector; +private BlockingQueue queue = new LinkedBlockingQueue<>(); + +private int batchSize = 10; +private int batchIntervalInSec = 10; +private long lastBatchProcessTimeSeconds = 0; + +public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) { +this.clientConfig = clientConfig; +this.builder = builder; +this.tickFrequencySeconds = tickFrequencySeconds; +Validate.notNull(this.clientConfig); +Validate.notNull(this.builder); +Validate.isTrue(this.tickFrequencySeconds > 0); +} + +public NiFiBolt withBatchSize(int batchSize) { +this.batchSize = batchSize; +Validate.isTrue(this.batchSize > 0); +return this; +} + +public NiFiBolt withBatchInterval(int batchIntervalInSec) { +this.batchIntervalInSec = batchIntervalInSec; +Validate.isTrue(this.batchIntervalInSec > 0); +return this; +} + +@Override +public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { +this.client = createSiteToSiteClient(); +this.collector = outputCollector; +this.lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000; + +LOGGER.info("Bolt is prepared with Batch Size " + batchSize ++ ", Batch Interval " + batchIntervalInSec ++ ", Tick Frequency is " + tickFrequencySeconds); +} + +protected SiteToSiteClient createSiteToSiteClient() { +return new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); +} + +@Override +public void execute(Tuple tuple) { +if (TupleUtils.isTick(tuple)) {
[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/361#discussion_r60227326 --- Diff: nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.storm; + +import backtype.storm.Config; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Tuple; +import backtype.storm.utils.TupleUtils; +import org.apache.commons.lang3.Validate; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher + * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or + * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple + * immediately in a single transaction. + */ +public class NiFiBolt extends BaseRichBolt { + +private static final long serialVersionUID = 3067274587595578836L; +public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class); + +private final SiteToSiteClientConfig clientConfig; +private final NiFiDataPacketBuilder builder; +private final int tickFrequencySeconds; + +private SiteToSiteClient client; +private OutputCollector collector; +private BlockingQueue queue = new LinkedBlockingQueue<>(); + +private int batchSize = 10; +private int batchIntervalInSec = 10; +private long lastBatchProcessTimeSeconds = 0; + +public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) { +this.clientConfig = clientConfig; +this.builder = builder; +this.tickFrequencySeconds = tickFrequencySeconds; +Validate.notNull(this.clientConfig); +Validate.notNull(this.builder); +Validate.isTrue(this.tickFrequencySeconds > 0); +} + +public NiFiBolt withBatchSize(int batchSize) { +this.batchSize = batchSize; +Validate.isTrue(this.batchSize > 0); +return this; +} + +public NiFiBolt withBatchInterval(int batchIntervalInSec) { +this.batchIntervalInSec = batchIntervalInSec; +Validate.isTrue(this.batchIntervalInSec > 0); --- End diff -- For constructors I always liked validating the state of the actual object after assignment, but I am fine either way, can change those. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/361#discussion_r60226578 --- Diff: nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.storm; + +import backtype.storm.Config; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Tuple; +import backtype.storm.utils.TupleUtils; +import org.apache.commons.lang3.Validate; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher + * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or + * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple + * immediately in a single transaction. + */ +public class NiFiBolt extends BaseRichBolt { + +private static final long serialVersionUID = 3067274587595578836L; +public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class); + +private final SiteToSiteClientConfig clientConfig; +private final NiFiDataPacketBuilder builder; +private final int tickFrequencySeconds; + +private SiteToSiteClient client; +private OutputCollector collector; +private BlockingQueue queue = new LinkedBlockingQueue<>(); + +private int batchSize = 10; +private int batchIntervalInSec = 10; +private long lastBatchProcessTimeSeconds = 0; --- End diff -- Each spout/bolt should only be accessed by one thread, so we should be good here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/361 NIFI-1778 Adding NiFiBolt to write back to NiFi from Storm - Adding example topology that creates a full loop between NiFi and Storm. - Bumping Storm to 0.10.0 There is an example topology in src/test/resources that can be used with a NiFi flow that has output port "Data for Storm" and input port "Data from Storm", and of course enables site-to-site in nifi.properties. An example template is here: https://gist.github.com/bbende/279824e65d07f63e0002727159b5d78b You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-1778 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/361.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #361 commit 9b46a22f617d8e058d6b97654bf8c2d84a2b4415 Author: Bryan Bende <bbe...@apache.org> Date: 2016-04-18T18:00:40Z NIFI-1778 Adding NiFiBolt to write back to NiFi from Storm - Adding example topology that creates a full loop between NiFi and Storm. - Bumping Storm to 0.10.0 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1630 Making TestPutUDP select an open port...
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/332 NIFI-1630 Making TestPutUDP select an open port for testing to avoid conflicts You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-1630 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/332.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #332 commit 13523bd08a07534d130987cbe5cd612a55be7c9a Author: Bryan Bende <bbe...@apache.org> Date: 2016-04-06T19:09:39Z NIFI-1630 Making TestPutUDP select an open port for testing to avoid conflicts --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1698 Improving customValidate in AbstractH...
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/313 NIFI-1698 Improving customValidate in AbstractHadoopProcessor and HBa⦠â¦seClient service to not reload Configuration unless it changed You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-1698 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/313.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #313 commit 1859ed8660219650a54fbcf5504809302c7c6ac9 Author: Bryan Bende <bbe...@apache.org> Date: 2016-03-30T19:56:22Z NIFI-1698 Improving customValidate in AbstractHadoopProcessor and HBaseClient service to not reload Configuration unless it changed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1697 Ensuring FlowController appropriately...
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/312 NIFI-1697 Ensuring FlowController appropriately wraps code with NarClosable From debugging this issue it was noticed that the problem only occurred while a PutHDFS processor was enabled (running/stopepd), but if it was disabled the problem went away. This led to realizing that when the processor is running or stopped, validation is being called and was not being wrapped with NarCloseable to ensure the validation uses the same classpath as the component uses when executing. This PR ensures that the FlowController wraps validation with NarCloseable, and also when calling OnPrimaryNodeStateChanged which needed to be wrapped as well. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-1697 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/312.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #312 commit 605c6c88b53076034560d5faf3399c7e2b3e98fd Author: Bryan Bende <bbe...@apache.org> Date: 2016-03-30T15:46:52Z NIFI-1697 Ensuring FlowController appropriately wraps code with NarCloseable --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1691: Add Fetch Size property to QueryData...
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/307#issuecomment-202935582 This looks good, tested against a Postgres DB and works as expected, code looks good, will merge to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1420 Fixing minor bugs in GetSplunk
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/299 NIFI-1420 Fixing minor bugs in GetSplunk - Adding a Time Zone property so the Managed time ranges use the provided time zone when formatting the date strings - Adding a Time Field Strategy property to choose between searching event time or index time - Making the next iteration use previousLastTime + 1 ms to avoid overlap - Fixing bug where GetSplunk incorrectly cleared state on a restart of NiFi You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-1420-GetSplunk-Issues Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/299.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #299 commit 3aed0ed8dbc46f4acb302f0cd63f7f9fde2c1713 Author: Bryan Bende <bbe...@apache.org> Date: 2016-03-22T15:14:03Z NIFI-1420 Fixing minor bugs in GetSplunk - Adding a Time Zone property so the Managed time ranges use the provided time zone when formatting the date strings - Adding a Time Field Strategy property to choose between searching event time or index time - Making the next iteration use previousLastTime + 1 ms to avoid overlap - Fixing bug where GetSplunk incorrectly cleared state on a restart of NiFi --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1488 Refactoring HBase Kerberos support
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/281 NIFI-1488 Refactoring HBase Kerberos support - Storing UGI so we can support multiple HBaseClientServices with different configs - Creating nifi-hadoop-utils to hold utility code shared between HDFS and HBase processors - Incorporating KerberosProperties into existing hadoop processors You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-1488 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/281.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #281 commit 42766156c67542dc02d9ea9d8418c7f86bd7 Author: Bryan Bende <bbe...@apache.org> Date: 2016-03-15T18:58:03Z NIFI-1488 Refactoring HBase Kerberos support - Storing UGI so we can support multiple HBaseClientServices with different configs - Creating nifi-hadoop-utils to hold utility code shared between HDFS and HBase processors - Incorporating KerberosProperties into existing hadoop processors --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1599 Changing DatagramChannelDispatcher, s...
Github user bbende closed the pull request at: https://github.com/apache/nifi/pull/262 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-899 Rewrite of ListenUDP to use new listen...
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/266 NIFI-899 Rewrite of ListenUDP to use new listener framework, includes⦠⦠the following changes: - Adding Network Interface property to AbstractListenEventProcessor and ListenSyslog - Adding sending host and sending port to DatagramChannelDispatcher - Creation of common base class AbstractListenEventBatchingProcessor - Refactor of ListenUDP, ListenTCP, and ListenRELP to all extend from AbstractListenEventBatchingProcessor You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-899 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/266.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #266 commit 84d09498c05a5325978bf80d61e065f2ae3b15b7 Author: Bryan Bende <bbe...@apache.org> Date: 2016-03-10T14:19:54Z NIFI-899 Rewrite of ListenUDP to use new listener framework, includes the following changes: - Adding Network Interface property to AbstractListenEventProcessor and ListenSyslog - Adding sending host and sending port to DatagramChannelDispatcher - Creation of common base class AbstractListenEventBatchingProcessor - Refactor of ListenUDP, ListenTCP, and ListenRELP to all extend from AbstractListenEventBatchingProcessor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1607 Fixing issue in ListenRELP where it c...
Github user bbende closed the pull request at: https://github.com/apache/nifi/pull/264 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1607 Fixing issue in ListenRELP where it c...
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/264 NIFI-1607 Fixing issue in ListenRELP where it could commit the session before all flow files were transferred Was able to reproduce this exception by writing a unit test that simulated data coming from two senders and had batching turned on, which caused the code to try and commit the session in a loop that was processing the data for each sender. Moved the session.commit() outside the while loop, and acknowledged all messages after successful commit. The above unit test passes now. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-1607 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/264.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #264 commit 699f84f951c0e3a99564074a0d0ff473a9c63c14 Author: Bryan Bende <bbe...@apache.org> Date: 2016-03-08T23:04:28Z NIFI-1607 Fixing issue in ListenRELP where it could commit the session before all flow files were transferred --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1599 Changing DatagramChannelDispatcher, s...
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/262#issuecomment-193929305 @olegz I didn't want to get into a complete refactoring of the different handlers/dispatchers, as there are some complicated subtleties there, but I took another shot and tried to at least ensure that the queuing logic is shared across them with a class that wraps the blocking queue. I supposed it could have been a static utility method, but I'm not usually a fan of static methods that need to take a lot of parameters. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1488 Added hbase kerb auth with ugi
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/253#issuecomment-193920825 @rickysaltzer if you rebase to master I think the RELP failures should go away. Regarding testing, a couple of thoughts... 1) Could we update the validation test to include the new logic for Kerberos? https://github.com/apache/nifi/blob/40dd8a0a845ef5f4d4fde451f02376ab2fab9758/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java#L61 2) There are also a bunch of tests in there that mock everything, not sure how feasible this is, but is it possible to add a test where we somehow mocked the static calls to UserGroupInformation to at least prove that the correct values are getting passed when security is enabled? I'm not that familiar with mocking static methods so not sure what this entails. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1516 - AWS DynamoDB Get/Put/Delete Process...
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/224#issuecomment-193064977 @mans2singh I think a commit I made on Friday exposed a race condition in TestListenRELP that was always there, but we were getting lucky before and it was never happening. I believe I have this fixed in the PR for adding the Splunk bundle which should be merged in soon and should then make this pass. Sorry for any confusion it created. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/233#issuecomment-192800737 @JPercivall Pushed up a commit that addresses your comment about Client Auth being required, and I think I also fixed the race condition on the RELP test. If you are good with everything let me know and I will squash the commits. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle
Github user bbende commented on the pull request: https://github.com/apache/nifi/pull/233#issuecomment-192466857 @JPercivall Pushed up another commit that addresses the additional comments from today. Part of this change I decided to go the route that @trixpan suggested and change ListenSplunkForwarder to ListenTCP, and as a result moved it to the standard bundle. This will open it up to a lot more use cases and it wasn't really Splunk specific. As a result I decided to take out the mime.type attribute since it is writing bytes to FlowFiles and may not really be text/plain all the time. Let me know if anything else needs updating or was left out. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---