[GitHub] nifi issue #563: NIFI-2078: External state management.
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/563 This currently has conflicts with master and needs to be rebased before it can be merged (sorry for taking so long to review the PR). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #563: NIFI-2078: External state management.
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r69829659 --- Diff: nifi-api/src/main/java/org/apache/nifi/components/state/ExternalStateManager.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.state; + +import org.apache.nifi.annotation.behavior.Stateful; + +import java.io.IOException; + +/** + * + * The ExternalStateManager is responsible for providing NiFi a mechanism for retrieving + * and clearing state stored in an external system a NiFi component interact with. + * + * + * + * When calling methods in this class, the state is always retrieved/cleared from external system + * regardless NiFi instance is a part of a cluster or standalone. + * + * + * + * This mechanism is designed to allow developers to easily store and retrieve small amounts of state. + * Since implementation of this interface interacts with remote system, one should consider the cost of + * retrieving this data, and the amount of data should be kept to the minimum required. + * + * + * + * Any component that wishes to implement ExternalStateManager should also use the {@link Stateful} annotation + * with {@link Scope#EXTERNAL} to provide a description of what state is being stored. + * If this annotation is not present, the UI will not expose such information or allow DFMs to clear the state. + * + */ +public interface ExternalStateManager { + +/** + * Returns the current state for the component. This return value may be null. + * + * @return the current state for the component or null if there is no state is retrieved + * @throws IOException if unable to communicate with the underlying storage mechanism + */ +StateMap getState() throws IOException; + +/** + * Clears all keys and values from the component's state + * + * @throws IOException if unable to communicate with the underlying storage mechanism + */ +void clear() throws IOException; --- End diff -- This should be renamed to "clearState" to match the corresponding "getState" and to better identified the method. Since it's an interface you never know what else the processor may want to clear and "clear" could end up being ambiguous. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #563: NIFI-2078: External state management.
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r69829390 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java --- @@ -239,4 +300,188 @@ private void releaseFlowFile(FlowFile flowFile, ProcessContext context, ProcessS this.getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[] { flowFile, msgCount, executionDuration }); session.transfer(flowFile, REL_SUCCESS); } + +@Override +public StateMap getState() throws IOException { + +if (!isReadyToAccessState()) { +return null; +} + +final String groupId = kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG); +return submitConsumerGroupCommand("Fetch offsets", consumer -> { +final Map<String, String> partitionOffsets = consumer.partitionsFor(topic).stream() +.map(p -> new TopicPartition(topic, p.partition())) +.map(tp -> new ImmutablePair<>(tp, consumer.committed(tp))) +.filter(tpo -> tpo.right != null) +.collect(Collectors.toMap(tpo -> +"partition:" + tpo.left.partition(), +tpo -> String.valueOf(tpo.right.offset(; + +logger.info("Retrieved offsets from Kafka, topic={}, groupId={}, partitionOffsets={}", +topic, groupId, partitionOffsets); + +return new StandardStateMap(partitionOffsets, System.currentTimeMillis()); +}, null); +} + +private boolean isReadyToAccessState() { +if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(brokers) +|| kafkaProperties == null || StringUtils.isEmpty(kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG))) { +return false; +} +return true; +} + +/** + * Clear offsets stored in Kafka, by committing -1 as offset of each partitions of specified topic. + * + * Kafka allows commitSync if one of following conditions are met, + * see kafka.coordinator.GroupCoordinator.handleCommitOffsets for detail: + * + * The consumer is a member of the consumer group. In this case, + * even if there's other consumers connecting Kafka, offsets can be updated. + * It's dangerous to clear offsets if there're active consumers. + * When consumer.subscribe() and poll() are called, the consumer will be a member of the consumer group. + * + * There's no connected consumer within the group, + * and Kafka GroupCoordinator has marked the group as dead. + * It's safer but can take longer. + * + * + * The consumer group state transition is an async operation at Kafka group coordinator. + * Although clear() can only be called when the processor is stopped, + * the consumer group may not be fully removed at Kafka, in that case, CommitFailedException will be thrown. + * + * Following log msg can be found when GroupCoordinator has marked the group as dead + * in kafka.out on a Kafka broker server, it can take more than 30 seconds: + * [GroupCoordinator]: Group [gid] generation 1 is dead + * and removed (kafka.coordinator.GroupCoordinator) + * + */ +@Override +public void clear() throws IOException { + +if (!isReadyToAccessState()) { +return; +} + +final String groupId = kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG); +final Boolean result = submitConsumerGroupCommand("Clear offsets", consumer -> { --- End diff -- Same comment as on GetKafka, should this block the onTrigger from getting new messages? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #563: NIFI-2078: External state management.
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r69828743 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java --- @@ -166,4 +170,44 @@ public MessageAndMetadata answer(InvocationOnMock invocation) throws Throwable { } } +@Test +public void testGetState() throws Exception { +final GetKafka processor = new GetKafka(); +final TestRunner runner = TestRunners.newTestRunner(processor); + +assertNull("State should be null when required properties are not specified.", processor.getState()); + +runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "0.0.0.0:invalid-port"); +runner.setProperty(GetKafka.TOPIC, "testX"); + +assertNull("State should be null when required properties are not specified.", processor.getState()); + +runner.setProperty(GetKafka.GROUP_ID, "consumer-group-id"); + +try { +processor.getState(); +fail("The processor should try to access Zookeeper and should fail since it can not connect."); +} catch (IOException e) { +} +} + +@Test +public void testClearState() throws Exception { +final GetKafka processor = new GetKafka(); +final TestRunner runner = TestRunners.newTestRunner(processor); + +// Clear doesn't do anything until required properties are set. +processor.clear(); + +runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "0.0.0.0:invalid-port"); +runner.setProperty(GetKafka.TOPIC, "testX"); +runner.setProperty(GetKafka.GROUP_ID, "consumer-group-id"); + +try { +processor.getState(); --- End diff -- This is a unit test to clearState but calls getState here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #563: NIFI-2078: External state management.
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r69827581 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java --- @@ -481,4 +496,51 @@ private void releaseFlowFile(FlowFile flowFile, ProcessSession session, Map partitionOffsets = KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId); + +return new StandardStateMap(partitionOffsets, System.currentTimeMillis()); +} + +private boolean isReadyToAccessState() { +if(StringUtils.isEmpty(zookeeperConnectionString) +|| StringUtils.isEmpty(topic) +|| StringUtils.isEmpty(groupId)) { +return false; +} +return true; +} + +@Override +public void clear() throws IOException { +if (!isReadyToAccessState()) { +return; +} +KafkaUtils.clearPartitionOffsets(zookeeperConnectionString, topic, groupId); --- End diff -- Shouldn't this block the onTrigger from attempting to reach out to Kafka to get new messages? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #563: NIFI-2078: External state management.
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r69827217 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java --- @@ -218,11 +232,13 @@ } public void createConsumers(final ProcessContext context) { -final String topic = context.getProperty(TOPIC).getValue(); +topic = context.getProperty(TOPIC).getValue(); --- End diff -- Why change the three values here? They are set in the "onPropertyModified" so whenever it's changed it will readjust. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #563: NIFI-2078: External state management.
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r69827142 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java --- @@ -184,6 +195,9 @@ private volatile long deadlockTimeout; private volatile ExecutorService executor; +private String zookeeperConnectionString; --- End diff -- These three will be accessed by different threads and should be marked volatile. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #563: NIFI-2078: External state management.
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/563 @ijokarumawak I believe you're right about it only being Kafka. I looked through them as well and couldn't find an instance of external state. While surprised I am definitely happy because it makes this PR a lot easier to review, lol. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #612: NIFI-2159: Fixed bug that caused relationship names not to ...
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/612 +1 Reviewed code, did a contrib check build, created a cluster to demonstrated the problem and then recreated with the fix and the issue was resolved. Thanks Mark will merge it in to 0.x and master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #29: MINIFI-50 - Update READMEs and adjust docs
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/29#discussion_r69802769 --- Diff: minifi-docs/LICENSE --- @@ -0,0 +1,235 @@ + + Apache License + Version 2.0, January 2004 +http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import,
[GitHub] nifi-minifi pull request #29: MINIFI-50 - Update READMEs and adjust docs
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/29#discussion_r69802803 --- Diff: LICENSE --- @@ -0,0 +1,235 @@ + + Apache License + Version 2.0, January 2004 +http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfe
[GitHub] nifi-minifi pull request #29: MINIFI-50 - Update READMEs and adjust docs
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/29#discussion_r69802152 --- Diff: README.md --- @@ -18,7 +18,100 @@ MiNiFi is a child project effort of Apache NiFi ## Table of Contents +- [Features](#features) +- [Requirements](#requirements) +- [Getting Started](#getting-started) +- [Getting Help](#getting-help) +- [Documentation](#documentation) - [License](#license) +- [Export Control](#export-control) + +## Features + +Apache NiFi - MiNiFi is a complementary data collection approach that supplements the core tenets of [NiFi](http://nifi.apache.org/) in dataflow management, focusing on the collection of data at the source of its creation. + +Specific goals for MiNiFi are comprised of: +- small and lightweight footprint +- central management of agents +- generation of data provenance +- integration with NiFi for follow-on dataflow management and full chain of custody of information + +Perspectives of the role of MiNiFi should be from the perspective of the agent acting immediately at, or directly adjacent to, source sensors, systems, or servers. + +## Requirements +* JDK 1.8 or higher +* Apache Maven 3.1.0 or higher + +## Getting Started + +- Read through the [quickstart guide for development](http://nifi.apache.org/quickstart.html). --- End diff -- quick start guide gives pointers for how to start developing NiFi, not MiNiFi --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #29: MINIFI-50 - Update READMEs and adjust docs
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/29#discussion_r69801770 --- Diff: NOTICE --- @@ -0,0 +1,9 @@ +Apache NiFi - MiNiFi +Copyright 2014-2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This product includes the following work from the Apache Hadoop project: + +BoundedByteArrayOutputStream.java adapted to SoftLimitBoundedByteArrayOutputStream.java --- End diff -- Should this notice information be in the root level pom of MiNiFi or as part of the Notice when NiFi dependencies are brought in? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/576 +1 Did a contrib check build, reviewed the code and comments were addressed, and did a functional test using vanilla ES as well as one secured with Shield. Thanks for the contribution @mattyb149 I will merge it into master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69745504 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java --- @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Response; +import okhttp3.ResponseBody; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.codehaus.jackson.JsonNode; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "fetch", "read", "get", "http"}) +@CapabilityDescription("Retrieves a document from Elasticsearch using the specified connection properties and the " ++ "identifier of the document to retrieve. Note that the full body of the document will be read into memory before being " ++ "written to a Flow File for transfer.") +@WritesAttributes({ +@WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"), +@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), +@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") +}) +public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include"; + +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("All FlowFiles that are read from Elasticsearch are routed to this relationship.") +.build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder() +.name("failure") +.description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming " ++ "flow files will be routed to failure.") +.build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to th
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69745293 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java --- @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Response; +import okhttp3.ResponseBody; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.codehaus.jackson.JsonNode; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "fetch", "read", "get", "http"}) +@CapabilityDescription("Retrieves a document from Elasticsearch using the specified connection properties and the " ++ "identifier of the document to retrieve. Note that the full body of the document will be read into memory before being " ++ "written to a Flow File for transfer.") +@WritesAttributes({ +@WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"), +@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), +@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") +}) +public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include"; + +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("All FlowFiles that are read from Elasticsearch are routed to this relationship.") +.build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder() +.name("failure") +.description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming " ++ "flow files will be routed to failure.") +.build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to th
[GitHub] nifi-minifi pull request #24: MINIFI-49 Made TestRestChangeNotifier upload f...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/24#discussion_r69504541 --- Diff: minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java --- @@ -84,6 +78,14 @@ public void testFileUpload() throws Exception { assertEquals("The result of notifying listeners:\nMockChangeListener successfully handled the configuration change\n", response.body().string()); -assertEquals(new String(Files.readAllBytes(file.toPath())), mockChangeListener.getConfFile()); +// Ignoring the system dependent line ending. +String confFile = mockChangeListener.getConfFile(); +if (confFile.endsWith("\n") || confFile.endsWith("\r")) { --- End diff -- You are totally right, will change --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #25: MINIFI-22 Removing lingering parts of UI
GitHub user JPercivall opened a pull request: https://github.com/apache/nifi-minifi/pull/25 MINIFI-22 Removing lingering parts of UI Most of this work was done with MINIFI-38 but there were still some lingering transitive dependencies being brought in that needed to be removed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JPercivall/nifi-minifi MINIFI-22 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi/pull/25.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #25 commit d98d0970bf1929dcd3da13a51c7dd4b27d08680d Author: Joseph Percivall <joeperciv...@yahoo.com> Date: 2016-07-05T01:33:44Z MINIFI-22 Removing lingering parts of UI --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #21: MINIFI-41 - CLI utility for template.xml -> YA...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/21#discussion_r69371933 --- Diff: minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.toolkit.configuration; + +import org.apache.nifi.controller.Template; +import org.apache.nifi.minifi.commons.schema.ConfigSchema; +import org.apache.nifi.minifi.commons.schema.SecurityPropertiesSchema; +import org.apache.nifi.minifi.commons.schema.common.BaseSchema; +import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader; +import org.apache.nifi.minifi.commons.schema.serialization.SchemaSaver; +import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.apache.nifi.web.api.dto.ConnectableDTO; +import org.apache.nifi.web.api.dto.ConnectionDTO; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.dto.FlowSnippetDTO; +import org.apache.nifi.web.api.dto.NiFiComponentDTO; +import org.apache.nifi.web.api.dto.PortDTO; +import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; +import org.apache.nifi.web.api.dto.TemplateDTO; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class ConfigMain { +public static final int ERR_INVALID_ARGS = 1; +public static final int ERR_UNABLE_TO_OPEN_OUTPUT = 2; +public static final int ERR_UNABLE_TO_OPEN_INPUT = 3; +public static final int ERR_UNABLE_TO_READ_TEMPLATE = 4; +public static final int ERR_UNABLE_TO_TRANFORM_TEMPLATE = 5; +public static final int ERR_UNABLE_TO_PARSE_CONFIG = 6; +public static final int ERR_INVALID_CONFIG = 7; + +public static final int SUCCESS = 0; + +public static final String TRANSFORM = "transform"; +public static final String VALIDATE = "validate"; + +private final Map<String, Command> commandMap; +private final PathInputStreamFactory pathInputStreamFactory; +private final PathOutputStreamFactory pathOutputStreamFactory; + +public ConfigMain() { +this(FileInputStream::new, FileOutputStream::new); +} + +public ConfigMain(PathInputStreamFactory pathInputStreamFactory, PathOutputStreamFactory pathOutputStreamFactory) { +this.pathInputStreamFactory = pathInputStreamFactory; +this.pathOutputStreamFactory = pathOutputStreamFactory; +this.commandMap = createCommandMap(); +} + +public static void main(String[] args) { +System.exit(new ConfigMain().execute(args)); +} + +public static void printValidateUsage() { +System.out.println("Validate Usage:"); +System.out.println(); +System.out.print("java "); +System.out.print(ConfigMain.class.getCanonicalName()); +System.out.println(" validate INPUT_FILE"); +System.out.println(); +} + +public int validate(String[] args) { +if (args.l
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69357651 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.MediaType; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.RequestBody; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.util.StringUtils; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static org.apache.commons.lang3.StringUtils.trimToEmpty; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"}) +@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as " ++ "the index to insert into and the type of the document.") +public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") +.build(); + +public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder() +.name("Identifier Attribute") +.description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", " ++ "this property may be left empty or evalua
[GitHub] nifi-minifi pull request #21: MINIFI-41 - CLI utility for template.xml -> YA...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/21#discussion_r69352051 --- Diff: minifi-toolkit/minifi-toolkit-configuration/src/test/resources/Splunk_Processors_with_Listen_TCP.xml --- @@ -0,0 +1,18 @@ + + +Splunk Processors with Listen TCP420ccabf-c795-4a5f-a502-3deead673de3f1bee4e8-470b-41b7-97af-ada9f69988260 MB0f1bee4e8-470b-41b7-97af-ada9f699882610d65054-5258-4cc4-9589-bd1185b07f85PROCESSOR0 sec1failuref1bee4e8-470b-41b7-97af-ada9f69988260ac7aebc-7692-4f37-804d-a7f03d2ddd1bPROCESSOR0b171f7c1-c640-4d17-8a4a-9cee8a0c49a6f1bee4e8-470b-41b7-97af-ada9f69988260 MB0f1bee4e8-470b-41b7-97af-ada9f69988260ac7aebc-7692-4f37-804d-a7f03d2ddd1bPROCESSOR0 sec1successf1bee4e8-470b-41b7-97af-ada9f69988266c7f545e-c272-4f45-893e-0838b75d744ePROCESSOR00c10de1c-4917-4bb7-94e3-d346d3a736c2f1bee4e8-470b-41b7-97af-ada9f69988260 MB0f1bee4e8-470b-41b7-97af-ada9f6998826889f8d28-f936-457a-addb-c49ef7eed23bPROCESSOR0 sec1successf1bee4e8-470b-41b7-97af-ada9f699882641eb952d-e8c8-4d9d-85a7-0aabb542ec56PROCESSOR0c6000652-74be-4bf8-b01f-7fcb1dd0b48cf1bee4e8-470b-41b7-97af-ada9f69988260 MB0f1bee4e8-470b-41b7-97af-ada9f69988269fafd271-dcbd-4a6c-9625-29f28259fa4aPROCESSOR0 sec1failuref1bee4e8-470b-41b7-97af-ada9f6998826ff3b49ee-8535-4084-82e2-3aa22a10629dPROCESSOR0c6529f3f-b4f5-49cb-9a7a-93b9e52c71 faf1bee4e8-470b-41b7-97af-ada9f69988260 MB0f1bee4e8-470b-41b7-97af-ada9f69988266c7f545e-c272-4f45-893e-0838b75d744ePROCESSOR0 sec1successf1bee4e8-470b-41b7-97af-ada9f69988268bae21ad-6c89-4c01-9e71-b26340c50052PROCESSOR0fdbe0740-4be6-4712-917a-77613bb6089af1bee4e8-470b-41b7-97af-ada9f69988260 MB0f1bee4e8-470b-41b7-97af-ada9f6998826ff3b49ee-8535 -4084-82e2-3aa22a10629dPROCESSOR0 sec1successf1bee4e8-470b-41b7-97af-ada9f699882673212a32-1340-4423-a302-a884df0231b4PROCESSOR067557f37-c2f5-42b0-b2de-e7660ebc8145f1bee4e8-470b-41b7-97af-ada9f69988260 MB0f1bee4e8-470b-41b7-97af-ada9f699882678471344-9f3d-4eb5-94f9-e25ee9a35f79PROCESSOR0 sec1successf1bee4e8-470b-41b7-97af-ada9f6998826ed0fa412-fe42-47 7c-b799-eb3270287221PROCESSOR080944451-d3b5-45d8-a23d-ab214f6420b4f1bee4e8-470b-41b7-97af-ada9f69988260 MB0f1bee4e8-470b-41b7-97af-ada9f6998826792bf873-97f4-4af1-8aa9-0534e79da810PROCESSOR0 sec1successf1bee4e8-470b-41b7-97af-ada9f69988262ddc92dd-5f5b-4b3e-911d-d0aac1823b71PROCESSOR0871f7215-56c4-4733-927c-00236e82486eKeystore FilenameThe fully-qualified filename of the Keystore Keystore FilenamefalseKeystore FilenamefalsefalsefalseKeystore PasswordThe password for the KeystoreKeystore PasswordfalseKeystore PasswordfalsetruefalseKeystore TypeJKSJKSPKCS12PKCS12The Type of the KeystoreKeystore TypefalseKeystore TypefalsefalsefalseTruststore FilenameThe fully-qualified filename of the TruststoreTruststore FilenamefalseTruststore FilenamefalsefalsefalseTruststore PasswordThe password for the TruststoreTruststore PasswordfalseTruststore PasswordfalsetruefalseTruststore TypeJKSJKSPKCS12PKCS12The Type of the Truststore. Either JKS or PKCS12Truststore TypefalseTruststore TypefalsefalsefalseSSL ProtocolSSLSSLSSLv2HelloSSLv2HelloSSLv3SSLv3TLSTLSTLSv1TLSv1TLSv1.1TLSv1.1TLSv1.2TLSv1.2TLSThe algorithm to use for this SSL contextSSL ProtocolfalseSSL ProtocolfalsefalsefalseStandardSSLContextServiceKeystore Filename/home/osboxes/Dev/certs/distro-1.p12Keystore PasswordKeystore TypePKCS12Truststore Filename/home/osboxes/Dev/certs/truststore.jksTruststore PasswordTruststore TypeJKSSSL ProtocolTLS0PortThe port to listen on for communication.PortfalsePorttruefalsefalseReceive Buffer Size65507 BThe size of eac h buffer used to receive messages. Adjust this value appropriately based on the expected size of the incoming messages.Receive Buffer SizefalseReceive Buffer SizetruefalsefalseMax Size of Message Queue1The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. Setting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total memory used by the processor.Max Size of Message QueuefalseMax Size of Message QueuetruefalsefalseMax Size of Socket Buffer< defaultValue>1 MBThe maximum size of the socket buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped.Max Size of Socket BufferfalseMax Size of Socket BuffertruefalsefalseCharacter SetUTF-8Specifies the character set of the received data.Character SetfalseCharacter SettruefalsefalseMax Number of TCP Connections2The maximum number of concurrent TCP connections to accept.Max Number of TCP ConnectionsfalseMax Number of TCP ConnectionstruefalsefalseMax Batch Size1The maximum number of messages to add to a
[GitHub] nifi-minifi pull request #21: MINIFI-41 - CLI utility for template.xml -> YA...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/21#discussion_r69342080 --- Diff: minifi-toolkit/minifi-toolkit-configuration/src/test/resources/Splunk_Processors_with_Listen_TCP.xml --- @@ -0,0 +1,18 @@ + + +Splunk Processors with Listen TCP420ccabf-c795-4a5f-a502-3deead673de3f1bee4e8-470b-41b7-97af-ada9f69988260 MB0f1bee4e8-470b-41b7-97af-ada9f699882610d65054-5258-4cc4-9589-bd1185b07f85PROCESSOR0 sec1failuref1bee4e8-470b-41b7-97af-ada9f69988260ac7aebc-7692-4f37-804d-a7f03d2ddd1bPROCESSOR0b171f7c1-c640-4d17-8a4a-9cee8a0c49a6f1bee4e8-470b-41b7-97af-ada9f69988260 MB0f1bee4e8-470b-41b7-97af-ada9f69988260ac7aebc-7692-4f37-804d-a7f03d2ddd1bPROCESSOR0 sec1successf1bee4e8-470b-41b7-97af-ada9f69988266c7f545e-c272-4f45-893e-0838b75d744ePROCESSOR00c10de1c-4917-4bb7-94e3-d346d3a736c2f1bee4e8-470b-41b7-97af-ada9f69988260 MB0f1bee4e8-470b-41b7-97af-ada9f6998826889f8d28-f936-457a-addb-c49ef7eed23bPROCESSOR0 sec1successf1bee4e8-470b-41b7-97af-ada9f699882641eb952d-e8c8-4d9d-85a7-0aabb542ec56PROCESSOR0c6000652-74be-4bf8-b01f-7fcb1dd0b48cf1bee4e8-470b-41b7-97af-ada9f69988260 MB0f1bee4e8-470b-41b7-97af-ada9f69988269fafd271-dcbd-4a6c-9625-29f28259fa4aPROCESSOR0 sec1failuref1bee4e8-470b-41b7-97af-ada9f6998826ff3b49ee-8535-4084-82e2-3aa22a10629dPROCESSOR0c6529f3f-b4f5-49cb-9a7a-93b9e52c71 faf1bee4e8-470b-41b7-97af-ada9f69988260 MB0f1bee4e8-470b-41b7-97af-ada9f69988266c7f545e-c272-4f45-893e-0838b75d744ePROCESSOR0 sec1successf1bee4e8-470b-41b7-97af-ada9f69988268bae21ad-6c89-4c01-9e71-b26340c50052PROCESSOR0fdbe0740-4be6-4712-917a-77613bb6089af1bee4e8-470b-41b7-97af-ada9f69988260 MB0f1bee4e8-470b-41b7-97af-ada9f6998826ff3b49ee-8535 -4084-82e2-3aa22a10629dPROCESSOR0 sec1successf1bee4e8-470b-41b7-97af-ada9f699882673212a32-1340-4423-a302-a884df0231b4PROCESSOR067557f37-c2f5-42b0-b2de-e7660ebc8145f1bee4e8-470b-41b7-97af-ada9f69988260 MB0f1bee4e8-470b-41b7-97af-ada9f699882678471344-9f3d-4eb5-94f9-e25ee9a35f79PROCESSOR0 sec1successf1bee4e8-470b-41b7-97af-ada9f6998826ed0fa412-fe42-47 7c-b799-eb3270287221PROCESSOR080944451-d3b5-45d8-a23d-ab214f6420b4f1bee4e8-470b-41b7-97af-ada9f69988260 MB0f1bee4e8-470b-41b7-97af-ada9f6998826792bf873-97f4-4af1-8aa9-0534e79da810PROCESSOR0 sec1successf1bee4e8-470b-41b7-97af-ada9f69988262ddc92dd-5f5b-4b3e-911d-d0aac1823b71PROCESSOR0871f7215-56c4-4733-927c-00236e82486eKeystore FilenameThe fully-qualified filename of the Keystore Keystore FilenamefalseKeystore FilenamefalsefalsefalseKeystore PasswordThe password for the KeystoreKeystore PasswordfalseKeystore PasswordfalsetruefalseKeystore TypeJKSJKSPKCS12PKCS12The Type of the KeystoreKeystore TypefalseKeystore TypefalsefalsefalseTruststore FilenameThe fully-qualified filename of the TruststoreTruststore FilenamefalseTruststore FilenamefalsefalsefalseTruststore PasswordThe password for the TruststoreTruststore PasswordfalseTruststore PasswordfalsetruefalseTruststore TypeJKSJKSPKCS12PKCS12The Type of the Truststore. Either JKS or PKCS12Truststore TypefalseTruststore TypefalsefalsefalseSSL ProtocolSSLSSLSSLv2HelloSSLv2HelloSSLv3SSLv3TLSTLSTLSv1TLSv1TLSv1.1TLSv1.1TLSv1.2TLSv1.2TLSThe algorithm to use for this SSL contextSSL ProtocolfalseSSL ProtocolfalsefalsefalseStandardSSLContextServiceKeystore Filename/home/osboxes/Dev/certs/distro-1.p12Keystore PasswordKeystore TypePKCS12Truststore Filename/home/osboxes/Dev/certs/truststore.jksTruststore PasswordTruststore TypeJKSSSL ProtocolTLS0PortThe port to listen on for communication.PortfalsePorttruefalsefalseReceive Buffer Size65507 BThe size of eac h buffer used to receive messages. Adjust this value appropriately based on the expected size of the incoming messages.Receive Buffer SizefalseReceive Buffer SizetruefalsefalseMax Size of Message Queue1The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. Setting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total memory used by the processor.Max Size of Message QueuefalseMax Size of Message QueuetruefalsefalseMax Size of Socket Buffer< defaultValue>1 MBThe maximum size of the socket buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped.Max Size of Socket BufferfalseMax Size of Socket BuffertruefalsefalseCharacter SetUTF-8Specifies the character set of the received data.Character SetfalseCharacter SettruefalsefalseMax Number of TCP Connections2The maximum number of concurrent TCP connections to accept.Max Number of TCP ConnectionsfalseMax Number of TCP ConnectionstruefalsefalseMax Batch Size1The maximum number of messages to add to a
[GitHub] nifi issue #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/576 It would be nice to have integration tests similar to those in the original Fetch/PutElasticSearch processors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69324539 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.HttpUrl; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.codehaus.jackson.JsonNode; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "fetch", "read", "get", "http"}) +@CapabilityDescription("Retrieves a document from Elasticsearch using the specified connection properties and the " ++ "identifier of the document to retrieve.") +@WritesAttributes({ +@WritesAttribute(attribute = "filename", description = "The filename attributes is set to the document identifier"), +@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), +@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") +}) +public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include"; + +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are read from Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may succeed") +.build(); + +public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found") +
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69323436 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.Request; +import com.squareup.okhttp.RequestBody; +import com.squareup.okhttp.Response; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.net.URL; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A base class for Elasticsearch processors that use the HTTP API + */ +public abstract class AbstractElasticsearchHttpProcessor extends AbstractElasticsearchProcessor { + +public static final PropertyDescriptor ES_URL = new PropertyDescriptor.Builder() +.name("elasticsearch-http-url") +.displayName("Elasticsearch URL") +.description("Elasticsearch URL which will be connected to, including scheme, host, port, path. The default port for the REST API is 9200.") +.required(true) +.addValidator(StandardValidators.URL_VALIDATOR) +.build(); + +public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder() +.name("elasticsearch-http-proxy-host") +.displayName("Proxy Host") +.description("The fully qualified hostname or IP address of the proxy server") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder() +.name("elasticsearch-http-proxy-port") +.displayName("Proxy Port") +.description("The port of the proxy server") +.required(false) +.addValidator(StandardValidators.PORT_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder() +.name("elasticsearch-http-connect-timeout") +.displayName("Connection Timeout") +.description("Max wait time for the connection to the Elasticsearch REST API.") +.required(true) +.defaultValue("5 secs") +.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) +.build(); + +public static final PropertyDescriptor RESPONSE_TIMEOUT = new PropertyDescriptor.Builder() +.name("elasticsearch-http-response-timeout") +.displayName("Response Timeout") +.description("Max wait time for a response from the Elasticsearch REST API.") +.required(true) +.defaultValue("15 secs") +.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) +.build(); + +private final AtomicReference okHttpClientAtomicReference = new AtomicReference<>(); + +
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69320973 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java --- @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.Call; +import com.squareup.okhttp.MediaType; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.Protocol; +import com.squareup.okhttp.Request; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestFetchElasticsearchHttp { + +private InputStream docExample; +private TestRunner runner; + +@Before +public void setUp() throws IOException { +ClassLoader classloader = Thread.currentThread().getContextClassLoader(); +docExample = classloader.getResourceAsStream("DocumentExample.json"); --- End diff -- Haha fair enough, as long as it works it doesn't matter to me --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69320860 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.HttpUrl; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.codehaus.jackson.JsonNode; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "fetch", "read", "get", "http"}) +@CapabilityDescription("Retrieves a document from Elasticsearch using the specified connection properties and the " ++ "identifier of the document to retrieve.") +@WritesAttributes({ +@WritesAttribute(attribute = "filename", description = "The filename attributes is set to the document identifier"), +@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), +@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") +}) +public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include"; + +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are read from Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may succeed") +.build(); + +public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found") +
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69320695 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml --- @@ -56,6 +56,11 @@ language governing permissions and limitations under the License. --> ${es.version} +com.squareup.okhttp +okhttp +2.7.1 --- End diff -- That's fair but in response to an issue found by a user on the mail list[1] invokeHttp will need to be updated anyway. So might as well code this on OkHttp v3 [1] https://issues.apache.org/jira/browse/NIFI-2162 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69319929 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java --- @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.Call; +import com.squareup.okhttp.MediaType; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.Protocol; +import com.squareup.okhttp.Request; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestFetchElasticsearchHttp { + +private InputStream docExample; +private TestRunner runner; + +@Before +public void setUp() throws IOException { +ClassLoader classloader = Thread.currentThread().getContextClassLoader(); +docExample = classloader.getResourceAsStream("DocumentExample.json"); --- End diff -- Is it necessary to have a document on disk? Just hesitant regarding OS issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69319708 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.MediaType; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.RequestBody; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.util.StringUtils; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static org.apache.commons.lang3.StringUtils.trimToEmpty; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"}) +@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as " ++ "the index to insert into and the type of the document.") +public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") +.build(); + +public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder() +.name("Identifier Attribute") +.description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", " ++ "this property may be left empty or evalua
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69319479 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.MediaType; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.RequestBody; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.util.StringUtils; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static org.apache.commons.lang3.StringUtils.trimToEmpty; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"}) +@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as " ++ "the index to insert into and the type of the document.") +public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") +.build(); + +public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder() +.name("Identifier Attribute") +.description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", " ++ "this property may be left empty or evalua
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69319085 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.MediaType; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.RequestBody; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.util.StringUtils; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static org.apache.commons.lang3.StringUtils.trimToEmpty; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"}) +@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as " ++ "the index to insert into and the type of the document.") +public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") +.build(); + +public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder() +.name("Identifier Attribute") +.description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", " ++ "this property may be left empty or evalua
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69318837 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.MediaType; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.RequestBody; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.util.StringUtils; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static org.apache.commons.lang3.StringUtils.trimToEmpty; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"}) +@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as " ++ "the index to insert into and the type of the document.") +public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") +.build(); + +public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder() +.name("Identifier Attribute") +.description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", " ++ "this property may be left empty or evalua
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69318008 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.MediaType; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.RequestBody; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.util.StringUtils; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static org.apache.commons.lang3.StringUtils.trimToEmpty; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"}) +@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as " ++ "the index to insert into and the type of the document.") +public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") +.build(); + +public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder() +.name("Identifier Attribute") +.description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", " ++ "this property may be left empty or evalua
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69317662 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.MediaType; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.RequestBody; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.util.StringUtils; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static org.apache.commons.lang3.StringUtils.trimToEmpty; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"}) +@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as " ++ "the index to insert into and the type of the document.") +public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") +.build(); + +public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder() +.name("Identifier Attribute") +.description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", " ++ "this property may be left empty or evalua
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69317330 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.MediaType; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.RequestBody; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.util.StringUtils; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static org.apache.commons.lang3.StringUtils.trimToEmpty; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"}) +@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as " ++ "the index to insert into and the type of the document.") +public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") +.build(); + +public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder() +.name("Identifier Attribute") +.description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", " ++ "this property may be left empty or evalua
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69317281 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.MediaType; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.RequestBody; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.util.StringUtils; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static org.apache.commons.lang3.StringUtils.trimToEmpty; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"}) +@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as " ++ "the index to insert into and the type of the document.") +public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") +.build(); + +public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder() +.name("Identifier Attribute") +.description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", " ++ "this property may be left empty or evalua
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69317035 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.MediaType; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.RequestBody; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.util.StringUtils; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static org.apache.commons.lang3.StringUtils.trimToEmpty; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"}) +@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as " ++ "the index to insert into and the type of the document.") +public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") +.build(); + +public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder() +.name("Identifier Attribute") +.description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", " ++ "this property may be left empty or evalua
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69316350 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.MediaType; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.RequestBody; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.util.StringUtils; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static org.apache.commons.lang3.StringUtils.trimToEmpty; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"}) +@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as " ++ "the index to insert into and the type of the document.") +public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") +.build(); + +public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder() +.name("Identifier Attribute") +.description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", " ++ "this property may be left empty or evalua
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69314307 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.HttpUrl; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.codehaus.jackson.JsonNode; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "fetch", "read", "get", "http"}) +@CapabilityDescription("Retrieves a document from Elasticsearch using the specified connection properties and the " ++ "identifier of the document to retrieve.") +@WritesAttributes({ +@WritesAttribute(attribute = "filename", description = "The filename attributes is set to the document identifier"), +@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), +@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") +}) +public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include"; + +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are read from Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may succeed") +.build(); + +public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found") +
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69313807 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.HttpUrl; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.codehaus.jackson.JsonNode; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "fetch", "read", "get", "http"}) +@CapabilityDescription("Retrieves a document from Elasticsearch using the specified connection properties and the " ++ "identifier of the document to retrieve.") +@WritesAttributes({ +@WritesAttribute(attribute = "filename", description = "The filename attributes is set to the document identifier"), +@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), +@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") +}) +public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include"; + +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are read from Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may succeed") +.build(); + +public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found") +
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69313182 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.HttpUrl; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.codehaus.jackson.JsonNode; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "fetch", "read", "get", "http"}) +@CapabilityDescription("Retrieves a document from Elasticsearch using the specified connection properties and the " ++ "identifier of the document to retrieve.") +@WritesAttributes({ +@WritesAttribute(attribute = "filename", description = "The filename attributes is set to the document identifier"), +@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), +@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") +}) +public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include"; + +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are read from Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may succeed") +.build(); + +public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found") +
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69312668 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.squareup.okhttp.HttpUrl; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.ResponseBody; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.codehaus.jackson.JsonNode; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "fetch", "read", "get", "http"}) +@CapabilityDescription("Retrieves a document from Elasticsearch using the specified connection properties and the " ++ "identifier of the document to retrieve.") +@WritesAttributes({ +@WritesAttribute(attribute = "filename", description = "The filename attributes is set to the document identifier"), +@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), +@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") +}) +public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + +private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include"; + +public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("All FlowFiles that are read from Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +.description("A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may succeed") +.build(); + +public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found") +
[GitHub] nifi pull request #576: NIFI-2068: Add Elasticsearch HTTP processors
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/576#discussion_r69306232 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml --- @@ -56,6 +56,11 @@ language governing permissions and limitations under the License. --> ${es.version} +com.squareup.okhttp +okhttp +2.7.1 --- End diff -- What's the motiviation for using this version of OkHttp instead of the latest[1][2]? [1] https://github.com/square/okhttp/tree/parent-3.3.1 [2] https://mvnrepository.com/artifact/com.squareup.okhttp3/okhttp --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #21: MINIFI-41 - CLI utility for template.xml -> YA...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/21#discussion_r69218372 --- Diff: minifi-commons/minifi-commons-schema/pom.xml --- @@ -0,0 +1,46 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + +org.apache.nifi.minifi +minifi-commons +0.0.1-SNAPSHOT + + +minifi-commons-schema +jar + + + +org.yaml +snakeyaml +1.17 + + +org.apache.nifi +nifi-framework-core +0.6.0 + + --- End diff -- (Accidentally commented on the wrong pom originally) This should probably be removed from this pom too since it will be used in both assemblies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #21: MINIFI-41 - CLI utility for template.xml -> YA...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/21#discussion_r69214705 --- Diff: minifi-toolkit/minifi-toolkit-configuration/pom.xml --- @@ -0,0 +1,42 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + +minifi-toolkit +org.apache.nifi.minifi +0.0.1-SNAPSHOT + +minifi-toolkit-configuration +jar + + + +org.apache.nifi.minifi +minifi-commons-schema +${project.version} + + +org.apache.nifi +nifi-framework-core +0.6.0 --- End diff -- This should be added to the Dependency Management portion of the root pom so that it shares the nifi version set there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #21: MINIFI-41 - CLI utility for template.xml -> YA...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/21#discussion_r69210176 --- Diff: minifi-toolkit/minifi-toolkit-assembly/src/main/resources/config.sh --- @@ -0,0 +1,133 @@ +#!/bin/sh +# +#Licensed to the Apache Software Foundation (ASF) under one or more +#contributor license agreements. See the NOTICE file distributed with +#this work for additional information regarding copyright ownership. +#The ASF licenses this file to You under the Apache License, Version 2.0 +#(the "License"); you may not use this file except in compliance with +#the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +#Unless required by applicable law or agreed to in writing, software +#distributed under the License is distributed on an "AS IS" BASIS, +#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +#See the License for the specific language governing permissions and +#limitations under the License. +# +# chkconfig: 2345 20 80 +# description: Apache NiFi - MiNiFi +# + +# Script structure inspired from Apache Karaf and other Apache projects with similar startup approaches + +SCRIPT_DIR=$(dirname "$0") +SCRIPT_NAME=$(basename "$0") +MINIFI_TOOLKIT_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd) +PROGNAME=$(basename "$0") + + +warn() { +echo "${PROGNAME}: $*" +} + +die() { +warn "$*" +exit 1 +} + +detectOS() { +# OS specific support (must be 'true' or 'false'). +cygwin=false; +aix=false; +os400=false; +darwin=false; +case "$(uname)" in +CYGWIN*) +cygwin=true +;; +AIX*) +aix=true +;; +OS400*) +os400=true +;; +Darwin) +darwin=true +;; +esac +# For AIX, set an environment variable +if ${aix}; then + export LDR_CNTRL=MAXDATA=0xB000@DSA + echo ${LDR_CNTRL} +fi +} + +locateJava() { +# Setup the Java Virtual Machine +if $cygwin ; then +[ -n "${JAVA}" ] && JAVA=$(cygpath --unix "${JAVA}") +[ -n "${JAVA_HOME}" ] && JAVA_HOME=$(cygpath --unix "${JAVA_HOME}") +fi + +if [ "x${JAVA}" = "x" ] && [ -r /etc/gentoo-release ] ; then +JAVA_HOME=$(java-config --jre-home) +fi +if [ "x${JAVA}" = "x" ]; then +if [ "x${JAVA_HOME}" != "x" ]; then +if [ ! -d "${JAVA_HOME}" ]; then +die "JAVA_HOME is not valid: ${JAVA_HOME}" +fi +JAVA="${JAVA_HOME}/bin/java" +else +warn "JAVA_HOME not set; results may vary" +JAVA=$(type java) +JAVA=$(expr "${JAVA}" : '.* \(/.*\)$') +if [ "x${JAVA}" = "x" ]; then +die "java command not found" +fi +fi +fi +# if command is env, attempt to add more to the classpath +if [ "$1" = "env" ]; then +[ "x${TOOLS_JAR}" = "x" ] && [ -n "${JAVA_HOME}" ] && TOOLS_JAR=$(find -H "${JAVA_HOME}" -name "tools.jar") +[ "x${TOOLS_JAR}" = "x" ] && [ -n "${JAVA_HOME}" ] && TOOLS_JAR=$(find -H "${JAVA_HOME}" -name "classes.jar") + if [ "x${TOOLS_JAR}" = "x" ]; then + warn "Could not locate tools.jar or classes.jar. Please set manually to avail all command features." +fi +fi --- End diff -- Is this block left over from the minifi.sh? I tried running it and got this: Joseph-Percivall:bin jpercivall$ ./config.sh env Java home: /Library/Java/JavaVirtualMachines/jdk1.8.0_74.jdk/Contents/Home MiNiFi Toolkit home: /Users/jpercivall/projects/edge/nifi-minifi/minifi-toolkit/minifi-toolkit-assembly/target/minifi-0.0.1-SNAPSHOT-bin/minifi-toolkit-0.0.1-SNAPSHOT Usage: java org.apache.nifi.minifi.toolkit.configuration.ConfigMain options Valid commands include: transform: Transform template xml into MiNiFi config YAML validate: Validate config YAML --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #21: MINIFI-41 - CLI utility for template.xml -> YA...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/21#discussion_r69180850 --- Diff: minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/BaseSchema.java --- @@ -15,13 +15,17 @@ * limitations under the License. */ -package org.apache.nifi.minifi.bootstrap.util.schema.common; +package org.apache.nifi.minifi.commons.schema; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.util.Collections; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; public class BaseSchema { --- End diff -- Ah very true, ok I'm on board with not making them static. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #21: MINIFI-41 - CLI utility for template.xml -> YA...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/21#discussion_r69178841 --- Diff: minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/BaseSchema.java --- @@ -15,13 +15,17 @@ * limitations under the License. */ -package org.apache.nifi.minifi.bootstrap.util.schema.common; +package org.apache.nifi.minifi.commons.schema; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.util.Collections; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; public class BaseSchema { --- End diff -- Right the "Validation Issue helper methods" access the validation issues, which is something every schema will have, so it's good to keep them implemented how they are. The "Value Access/Interpretation helper methods" are all methods that do not necessarily rely on the internal representation of each schema but instead are helper methods for accessing/interpreting values. So they can be static. For toMap I agree that it's good to encourage the use of LinkedHashMap. Maybe a protected method getMap for the schemas to use. Then toMap would be abstract. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #21: MINIFI-41 - CLI utility for template.xml -> YA...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/21#discussion_r69162642 --- Diff: minifi-toolkit/minifi-toolkit-assembly/NOTICE --- @@ -0,0 +1,21 @@ +Apache NiFi +Copyright 2014-2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +=== +Apache Software License v2 +=== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache NiFi +The following NOTICE information applies: + Apache NiFi + Copyright 2014-2016 The Apache Software Foundation + + (ASLv2) Jetty +The following NOTICE information applies: + Jetty Web Container --- End diff -- Is Jetty packaged in the toolkit-assembly? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi-minifi pull request #21: MINIFI-41 - CLI utility for template.xml -> YA...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/21#discussion_r69159093 --- Diff: minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SchemaLoader.java --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.error.YAMLException; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +public class SchemaLoader { +public static Map<String, Object> loadYamlAsMap(InputStream sourceStream) throws IOException, SchemaLoaderException { +try { +Yaml yaml = new Yaml(); + +// Parse the YAML file +final Object loadedObject = yaml.load(sourceStream); + +// Verify the parsed object is a Map structure +if (loadedObject instanceof Map) { +return (Map<String, Object>) loadedObject; +} else { +throw new SchemaLoaderException("Provided YAML configuration is not a Map"); +} +} catch (YAMLException e ) { +throw new IOException(e); +} finally { +sourceStream.close(); +} +} + +public static ConfigSchema loadConfigSchema(InputStream sourceStream) throws IOException, SchemaLoaderException { --- End diff -- Since this is just a wrapper around loadYamlAsMap and has the requirement that the input is Yaml, I'd prefer to have "Yaml" in somewhere in the method name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #525: NIFI-1976 - Windows Event Log native processor
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/525#discussion_r68943688 --- Diff: nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/main/java/org/apache/nifi/processors/windows/event/log/ConsumeWindowsEventLog.java --- @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.windows.event.log; + +import com.sun.jna.platform.win32.Kernel32; +import com.sun.jna.platform.win32.Kernel32Util; +import com.sun.jna.platform.win32.WinNT; +import org.apache.commons.io.Charsets; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.windows.event.log.jna.ErrorLookup; +import org.apache.nifi.processors.windows.event.log.jna.EventSubscribeXmlRenderingCallback; +import org.apache.nifi.processors.windows.event.log.jna.WEvtApi; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"ingest", "event", "windows"}) +@TriggerSerially +@CapabilityDescription("Registers a Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows. These can be filtered via channel and XPath.") +@WritesAttributes({ +@WritesAttribute(attribute = "mime.type", description = "Will set a MIME type value of application/xml.") +}) +public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor { +public static final String DEFAULT_CHANNEL = "System"; +public static final String DEFAULT_XPATH = "*"; +public static final int DEFAULT_MAX_BUFFER = 1024 * 1024; +public static final int DEFAULT_MAX_QUEUE_SIZE = 1024; + +public static final PropertyDescriptor CHANNEL = new PropertyDescriptor.Builder() +.name("channel") +.displayName("Channel") +.required(true) +.defaultValue(DEFAULT_CHANNEL) +.description("The Windows Event Log Channel to listen to.") +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("query") +.displayName("XPath Query") +.required(true) +.defaultValue(DEFAULT_XPATH) +.description("XPath Que
[GitHub] nifi pull request #525: NIFI-1976 - Windows Event Log native processor
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/525#discussion_r68799710 --- Diff: nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/main/java/org/apache/nifi/processors/windows/event/log/ConsumeWindowsEventLog.java --- @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.windows.event.log; + +import com.sun.jna.platform.win32.Kernel32; +import com.sun.jna.platform.win32.Kernel32Util; +import com.sun.jna.platform.win32.WinNT; +import org.apache.commons.io.Charsets; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.windows.event.log.jna.ErrorLookup; +import org.apache.nifi.processors.windows.event.log.jna.EventSubscribeXmlRenderingCallback; +import org.apache.nifi.processors.windows.event.log.jna.WEvtApi; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"ingest", "event", "windows"}) +@TriggerSerially +@CapabilityDescription("Registers a Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows. These can be filtered via channel and XPath.") +@WritesAttributes({ +@WritesAttribute(attribute = "mime.type", description = "Will set a MIME type value of application/xml.") +}) +public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor { +public static final String DEFAULT_CHANNEL = "System"; +public static final String DEFAULT_XPATH = "*"; +public static final int DEFAULT_MAX_BUFFER = 1024 * 1024; +public static final int DEFAULT_MAX_QUEUE_SIZE = 1024; + +public static final PropertyDescriptor CHANNEL = new PropertyDescriptor.Builder() +.name("channel") +.displayName("Channel") +.required(true) +.defaultValue(DEFAULT_CHANNEL) +.description("The Windows Event Log Channel to listen to.") +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("query") +.displayName("XPath Query") +.required(true) +.defaultValue(DEFAULT_XPATH) +.description("XPath Que
[GitHub] nifi pull request #525: NIFI-1976 - Windows Event Log native processor
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/525#discussion_r68780468 --- Diff: nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/main/java/org/apache/nifi/processors/windows/event/log/ConsumeWindowsEventLog.java --- @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.windows.event.log; + +import com.sun.jna.platform.win32.Kernel32; +import com.sun.jna.platform.win32.Kernel32Util; +import com.sun.jna.platform.win32.WinNT; +import org.apache.commons.io.Charsets; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.windows.event.log.jna.ErrorLookup; +import org.apache.nifi.processors.windows.event.log.jna.EventSubscribeXmlRenderingCallback; +import org.apache.nifi.processors.windows.event.log.jna.WEvtApi; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"ingest", "event", "windows"}) +@TriggerSerially +@CapabilityDescription("Registers a Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows. These can be filtered via channel and XPath.") +@WritesAttributes({ +@WritesAttribute(attribute = "mime.type", description = "Will set a MIME type value of application/xml.") +}) +public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor { +public static final String DEFAULT_CHANNEL = "System"; +public static final String DEFAULT_XPATH = "*"; +public static final int DEFAULT_MAX_BUFFER = 1024 * 1024; +public static final int DEFAULT_MAX_QUEUE_SIZE = 1024; + +public static final PropertyDescriptor CHANNEL = new PropertyDescriptor.Builder() +.name("channel") +.displayName("Channel") +.required(true) +.defaultValue(DEFAULT_CHANNEL) +.description("The Windows Event Log Channel to listen to.") +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("query") +.displayName("XPath Query") +.required(true) +.defaultValue(DEFAULT_XPATH) +.description("XPath Que
[GitHub] nifi pull request #525: NIFI-1976 - Windows Event Log native processor
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/525#discussion_r68780983 --- Diff: nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/main/java/org/apache/nifi/processors/windows/event/log/ConsumeWindowsEventLog.java --- @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.windows.event.log; + +import com.sun.jna.platform.win32.Kernel32; +import com.sun.jna.platform.win32.Kernel32Util; +import com.sun.jna.platform.win32.WinNT; +import org.apache.commons.io.Charsets; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.windows.event.log.jna.ErrorLookup; +import org.apache.nifi.processors.windows.event.log.jna.EventSubscribeXmlRenderingCallback; +import org.apache.nifi.processors.windows.event.log.jna.WEvtApi; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"ingest", "event", "windows"}) +@TriggerSerially +@CapabilityDescription("Registers a Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows. These can be filtered via channel and XPath.") +@WritesAttributes({ +@WritesAttribute(attribute = "mime.type", description = "Will set a MIME type value of application/xml.") +}) +public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor { +public static final String DEFAULT_CHANNEL = "System"; +public static final String DEFAULT_XPATH = "*"; +public static final int DEFAULT_MAX_BUFFER = 1024 * 1024; +public static final int DEFAULT_MAX_QUEUE_SIZE = 1024; + +public static final PropertyDescriptor CHANNEL = new PropertyDescriptor.Builder() +.name("channel") +.displayName("Channel") +.required(true) +.defaultValue(DEFAULT_CHANNEL) +.description("The Windows Event Log Channel to listen to.") +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("query") +.displayName("XPath Query") +.required(true) +.defaultValue(DEFAULT_XPATH) +.description("XPath Que
[GitHub] nifi pull request #525: NIFI-1976 - Windows Event Log native processor
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/525#discussion_r68777286 --- Diff: nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/main/java/org/apache/nifi/processors/windows/event/log/ConsumeWindowsEventLog.java --- @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.windows.event.log; + +import com.sun.jna.platform.win32.Kernel32; +import com.sun.jna.platform.win32.Kernel32Util; +import com.sun.jna.platform.win32.WinNT; +import org.apache.commons.io.Charsets; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.windows.event.log.jna.ErrorLookup; +import org.apache.nifi.processors.windows.event.log.jna.EventSubscribeXmlRenderingCallback; +import org.apache.nifi.processors.windows.event.log.jna.WEvtApi; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"ingest", "event", "windows"}) +@TriggerSerially +@CapabilityDescription("Registers a Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows. These can be filtered via channel and XPath.") +@WritesAttributes({ +@WritesAttribute(attribute = "mime.type", description = "Will set a MIME type value of application/xml.") +}) +public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor { +public static final String DEFAULT_CHANNEL = "System"; +public static final String DEFAULT_XPATH = "*"; +public static final int DEFAULT_MAX_BUFFER = 1024 * 1024; +public static final int DEFAULT_MAX_QUEUE_SIZE = 1024; + +public static final PropertyDescriptor CHANNEL = new PropertyDescriptor.Builder() +.name("channel") +.displayName("Channel") +.required(true) +.defaultValue(DEFAULT_CHANNEL) +.description("The Windows Event Log Channel to listen to.") +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("query") +.displayName("XPath Query") +.required(true) +.defaultValue(DEFAULT_XPATH) +.description("XPath Que
[GitHub] nifi pull request #525: NIFI-1976 - Windows Event Log native processor
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/525#discussion_r68666372 --- Diff: nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/main/java/org/apache/nifi/processors/windows/event/log/ConsumeWindowsEventLog.java --- @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.windows.event.log; + +import com.sun.jna.platform.win32.Kernel32; +import com.sun.jna.platform.win32.Kernel32Util; +import com.sun.jna.platform.win32.WinNT; +import org.apache.commons.io.Charsets; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.windows.event.log.jna.ErrorLookup; +import org.apache.nifi.processors.windows.event.log.jna.EventSubscribeXmlRenderingCallback; +import org.apache.nifi.processors.windows.event.log.jna.WEvtApi; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"ingest", "event", "windows"}) +@TriggerSerially +@CapabilityDescription("Registers a Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows. These can be filtered via channel and XPath.") +@WritesAttributes({ +@WritesAttribute(attribute = "mime.type", description = "Will set a MIME type value of application/xml.") +}) +public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor { +public static final String DEFAULT_CHANNEL = "System"; +public static final String DEFAULT_XPATH = "*"; +public static final int DEFAULT_MAX_BUFFER = 1024 * 1024; +public static final int DEFAULT_MAX_QUEUE_SIZE = 1024; + +public static final PropertyDescriptor CHANNEL = new PropertyDescriptor.Builder() +.name("channel") +.displayName("Channel") +.required(true) +.defaultValue(DEFAULT_CHANNEL) +.description("The Windows Event Log Channel to listen to.") +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("query") +.displayName("XPath Query") +.required(true) +.defaultValue(DEFAULT_XPATH) +.description("XPath Que
[GitHub] nifi pull request #525: NIFI-1976 - Windows Event Log native processor
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/525#discussion_r68660039 --- Diff: nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/main/java/org/apache/nifi/processors/windows/event/log/ConsumeWindowsEventLog.java --- @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.windows.event.log; + +import com.sun.jna.platform.win32.Kernel32; +import com.sun.jna.platform.win32.Kernel32Util; +import com.sun.jna.platform.win32.WinNT; +import org.apache.commons.io.Charsets; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.windows.event.log.jna.ErrorLookup; +import org.apache.nifi.processors.windows.event.log.jna.EventSubscribeXmlRenderingCallback; +import org.apache.nifi.processors.windows.event.log.jna.WEvtApi; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"ingest", "event", "windows"}) +@TriggerSerially +@CapabilityDescription("Registers a Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows. These can be filtered via channel and XPath.") +@WritesAttributes({ +@WritesAttribute(attribute = "mime.type", description = "Will set a MIME type value of application/xml.") +}) +public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor { +public static final String DEFAULT_CHANNEL = "System"; +public static final String DEFAULT_XPATH = "*"; +public static final int DEFAULT_MAX_BUFFER = 1024 * 1024; +public static final int DEFAULT_MAX_QUEUE_SIZE = 1024; + +public static final PropertyDescriptor CHANNEL = new PropertyDescriptor.Builder() +.name("channel") +.displayName("Channel") +.required(true) +.defaultValue(DEFAULT_CHANNEL) +.description("The Windows Event Log Channel to listen to.") +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("query") +.displayName("XPath Query") +.required(true) +.defaultValue(DEFAULT_XPATH) +.description("XPath Que
[GitHub] nifi issue #573: NIFI-2089: Ensure streams are closed before attempting to r...
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/573 +1 Fixes errors seen on Windows, code looks good and passes contrib-check. Thanks Mark --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #569: NIFI-2097 Changing RemoteProcessGroupStatusTest.java...
Github user JPercivall closed the pull request at: https://github.com/apache/nifi/pull/569 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #570: NIFI-2098 Adding ignore annotation to testManyFilesO...
GitHub user JPercivall opened a pull request: https://github.com/apache/nifi/pull/570 NIFI-2098 Adding ignore annotation to testManyFilesOpened unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/JPercivall/nifi NIFI-2098 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/570.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #570 commit f029ac28e8cd8d84d2390f8dbb0b8c01a2e6a37b Author: jpercivall <joeperciv...@yahoo.com> Date: 2016-06-23T17:45:21Z NIFI-2098 Adding ignore annotation to testManyFilesOpened unit test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #569: NIFI-2097 Changing RemoteProcessGroupStatusTest.java...
GitHub user JPercivall opened a pull request: https://github.com/apache/nifi/pull/569 NIFI-2097 Changing RemoteProcessGroupStatusTest.java to use a random open port ... ... instead of a statically defined one You can merge this pull request into a Git repository by running: $ git pull https://github.com/JPercivall/nifi NIFI-2097 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/569.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #569 commit 81f303a943193a17daa03ae1fa7c5da3c984a057 Author: jpercivall <joeperciv...@yahoo.com> Date: 2016-06-23T16:55:58Z NIFI-2097 Changing RemoteProcessGroupStatusTest to run on a random open port instead of a statically defined one commit b664fc648a9250dc745a7d60d47d6889d8d633cf Author: jpercivall <joeperciv...@yahoo.com> Date: 2016-06-23T17:21:56Z NIFI-2097 Changing RemoteProcessGroupStatusTest.java to use a random open port instead of a statically defined one --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #556: NIFI-615 - Create a processor to extract WAV file character...
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/556 Ah sorry @jskora for misinterpreting it, changes look good. I am gonna do another round of testing and then should be good to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #556: NIFI-615 - Create a processor to extract WAV file character...
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/556 @jskora what about changing it so there is a media and image bundles? Reading this comment[1] it's what we decided and the only way not to break backwards compatibility. [1] https://github.com/apache/nifi/pull/556#discussion_r68044015 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #556: NIFI-615 - Create a processor to extract WAV file character...
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/556 According to the latest comment from @bbende, yes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #556: NIFI-615 - Create a processor to extract WAV file character...
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/556 The only relevant exception I saw processing the download folder was an xml parse error related to a underlying limitation of Tika, no need to address: 2016-06-22 11:32:19,774 ERROR [Timer-Driven Process Thread-10] o.a.n.p.media.ExtractMediaMetadata ExtractMediaMetadata[id=1a1f93bd-5499-4ab4-8c86-7391f7aa1f19] Failed to extract media metadata from StandardFlowFileRecord[uuid=ea365723-636f-4fb7-b871-f14241d686f7,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1466609517187-31, container=default, section=31], offset=293539, length=888],offset=0,name=prelude.jspf,size=888] due to org.apache.nifi.processor.exception.ProcessException: IOException thrown from ExtractMediaMetadata[id=1a1f93bd-5499-4ab4-8c86-7391f7aa1f19]: java.io.IOException: org.apache.tika.exception.TikaException: XML parse error: org.apache.nifi.processor.exception.ProcessException: IOException thrown from ExtractMediaMetadata[id=1a1f93bd-5499-4ab4-8c86-7391f7aa1f19]: java.io.IOException: org.apache.tika.exception.TikaException: XML parse error 2016-06-22 11:32:19,776 ERROR [Timer-Driven Process Thread-10] o.a.n.p.media.ExtractMediaMetadata org.apache.nifi.processor.exception.ProcessException: IOException thrown from ExtractMediaMetadata[id=1a1f93bd-5499-4ab4-8c86-7391f7aa1f19]: java.io.IOException: org.apache.tika.exception.TikaException: XML parse error at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1873) ~[na:na] at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1822) ~[na:na] at org.apache.nifi.processors.media.ExtractMediaMetadata.onTrigger(ExtractMediaMetadata.java:211) ~[nifi-media-processors-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) [nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_74] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_74] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_74] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_74] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_74] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_74] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_74] Caused by: java.io.IOException: org.apache.tika.exception.TikaException: XML parse error at org.apache.nifi.processors.media.ExtractMediaMetadata$2.process(ExtractMediaMetadata.java:218) ~[nifi-media-processors-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1851) ~[na:na] ... 14 common frames omitted Caused by: org.apache.tika.exception.TikaException: XML parse error at org.apache.tika.parser.xml.XMLParser.parse(XMLParser.java:78) ~[tika-parsers-1.8.jar:1.8] at org.apache.tika.parser.CompositeParser.parse(CompositeParser.java:281) ~[tika-core-1.8.jar:1.8] at org.apache.tika.parser.CompositeParser.parse(CompositeParser.java:281) ~[tika-core-1.8.jar:1.8] at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:120) ~[tika-core-1.8.jar:1.8] at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:136) ~[tika-core-1.8.jar:1.8] at org.apache.nifi.processors.media.ExtractMediaMetadata.tika_parse(ExtractMediaMetadata.java:245) ~[nifi-media-processors-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] at org.apache.nifi.processors.media.ExtractMediaMetadata.access$000(ExtractMediaMetadata.java:74) ~[nifi-media-processors-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT
[GitHub] nifi issue #556: NIFI-615 - Create a processor to extract WAV file character...
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/556 @markap14 it seems odd to limit the usefulness of provenance for the reason to "keep data in the flow". Information is being created on "why this flowfile failed" but we don't track it in provenance, it seems wrong. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #556: NIFI-615 - Create a processor to extract WAV file character...
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/556 I am trying to parse my downloads folder again and I came across a couple exceptions (will post in other comment) but as I try to figure why it happened I realize that since ExtractMediaMetatadata doesn't emit a provenance event for the failure transfer so don't get any details. It would be nice to include in the catch block: ` session.getProvenanceReporter().route(flowFile, FAILURE, "Failed to extract media metadata due to " + e);` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #556: NIFI-615 - Create a processor to extract WAV file ch...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/556#discussion_r68074121 --- Diff: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/media/ExtractMediaMetadata.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.media; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.ObjectHolder; + +import org.apache.tika.exception.TikaException; +import org.apache.tika.io.TikaInputStream; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.parser.AutoDetectParser; +import org.apache.tika.sax.BodyContentHandler; +import org.xml.sax.SAXException; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"media", "file", "format", "metadata", "audio", "video", "image", "document", "pdf"}) +@CapabilityDescription("Extract the content metadata from flowfiles containing audio, video, image, and other file " ++ "types. This processor relies on the Apache Tika project for file format detection and parsing. It " ++ "extracts a long list of metadata types for media files including audio, video, and print media " ++ "formats." ++ "For the more details and the list of supported file types, visit the library's website " ++ "at http://tika.apache.org/.;) +@WritesAttributes({@WritesAttribute(attribute = ".", description = "The extracted content metadata " ++ "will be inserted with the attribute name \".\", or \"\" if " ++ "\"Metadata Key Prefix\" is not provided.")}) +@SupportsBatching +public class ExtractMediaMetadata extends AbstractProcessor { + +static final PropertyDescriptor MAX_NUMBER_OF_ATTRIBUTES = new PropertyDescriptor.Builder() +.name("Max Number of Attributes") +.description("Specify the max number of attributes to add to the flowfile. There is no guarantee in what order" ++ " the tags will be processed. By default it will process all of them.") +.
[GitHub] nifi pull request #556: NIFI-615 - Create a processor to extract WAV file ch...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/556#discussion_r68056970 --- Diff: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/test/java/org/apache/nifi/processors/media/TestExtractMediaMetadata.java --- @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.media; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestExtractMediaMetadata { + +@Test +public void testProperties() { +final TestRunner runner = TestRunners.newTestRunner(new ExtractMediaMetadata()); +ProcessContext context = runner.getProcessContext(); +Map<PropertyDescriptor, String> propertyValues = context.getProperties(); +assertEquals(6, propertyValues.size()); +} + +@Test +public void testRelationships() { +final TestRunner runner = TestRunners.newTestRunner(new ExtractMediaMetadata()); +ProcessContext context = runner.getProcessContext(); +Set relationships = context.getAvailableRelationships(); +assertEquals(2, relationships.size()); +assertTrue(relationships.contains(ExtractMediaMetadata.SUCCESS)); +assertTrue(relationships.contains(ExtractMediaMetadata.FAILURE)); +} + +@Test +public void testTextBytes() throws IOException { +final TestRunner runner = TestRunners.newTestRunner(new ExtractMediaMetadata()); +runner.setProperty(ExtractMediaMetadata.MIME_TYPE_FILTER, "text/.*"); +runner.setProperty(ExtractMediaMetadata.METADATA_KEY_FILTER, ""); +runner.setProperty(ExtractMediaMetadata.METADATA_KEY_PREFIX, "txt."); +runner.assertValid(); + +final Map<String, String> attrs = new HashMap<>(); +attrs.put("filename", "test1.txt"); +runner.enqueue("test1".getBytes(), attrs); +runner.run(); + +runner.assertAllFlowFilesTransferred(ExtractMediaMetadata.SUCCESS, 1); +runner.assertTransferCount(ExtractMediaMetadata.FAILURE, 0); + +final List successFiles = runner.getFlowFilesForRelationship(ExtractMediaMetadata.SUCCESS); +MockFlowFile flowFile0 = successFiles.get(0); +flowFile0.assertAttributeExists("filename"); +flowFile0.assertAttributeEquals("filename", "test1.txt"); +flowFile0.assertAttributeExists("txt.Content-Type"); + assertTrue(flowFile0.getAttribute("txt.Content-Type").startsWith("text/plain")); +flowFile0.assertAttributeExists("txt.X-Parsed-By"); + assertTrue(flowFile0.getAttribute("txt.X-Parsed-By").contains("org.apache.tika.parser.DefaultParser")); + assertTrue(flowFile0.getAttribute("txt.X-Parsed-By").contains("org.apache.tika.parser.txt.TXTParser")); +flowFile0.assertAttributeExists("txt.Content-Encoding"); +flowFile0.assertAttributeEquals("txt.Content-Encoding", "ISO-8859-1"); +flowFile0.assertContentEquals("test1".getBytes("UTF-8")); +
[GitHub] nifi pull request #556: NIFI-615 - Create a processor to extract WAV file ch...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/556#discussion_r68052320 --- Diff: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/media/ExtractMediaMetadata.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.media; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.ObjectHolder; + +import org.apache.tika.exception.TikaException; +import org.apache.tika.io.TikaInputStream; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.parser.AutoDetectParser; +import org.apache.tika.sax.BodyContentHandler; +import org.xml.sax.SAXException; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"media", "file", "format", "metadata", "audio", "video", "image", "document", "pdf"}) +@CapabilityDescription("Extract the content metadata from flowfiles containing audio, video, image, and other file " ++ "types. This processor relies on the Apache Tika project for file format detection and parsing. It " ++ "extracts a long list of metadata types for media files including audio, video, and print media " ++ "formats." ++ "For the more details and the list of supported file types, visit the library's website " ++ "at http://tika.apache.org/.;) +@WritesAttributes({@WritesAttribute(attribute = ".", description = "The extracted content metadata " ++ "will be inserted with the attribute name \".\", or \"\" if " ++ "\"Metadata Key Prefix\" is not provided.")}) +@SupportsBatching +public class ExtractMediaMetadata extends AbstractProcessor { + +static final PropertyDescriptor MAX_NUMBER_OF_ATTRIBUTES = new PropertyDescriptor.Builder() +.name("Max Number of Attributes") +.description("Specify the max number of attributes to add to the flowfile. There is no guarantee in what order" ++ " the tags will be processed. By default it will process all of them.") +.
[GitHub] nifi pull request #556: NIFI-615 - Create a processor to extract WAV file ch...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/556#discussion_r67982895 --- Diff: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/media/ExtractMediaMetadata.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.media; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.ObjectHolder; + +import org.apache.tika.exception.TikaException; +import org.apache.tika.io.TikaInputStream; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.parser.AutoDetectParser; +import org.apache.tika.sax.BodyContentHandler; +import org.xml.sax.SAXException; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"media", "file", "format", "metadata", "audio", "video", "image", "document", "pdf"}) +@CapabilityDescription("Extract the content metadata from flowfiles containing audio, video, image, and other file " ++ "types. This processor relies on the Apache Tika project for file format detection and parsing. It " ++ "extracts a long list of metadata types for media files including audio, video, and print media " ++ "formats." ++ "For the more details and the list of supported file types, visit the library's website " ++ "at http://tika.apache.org/.;) +@WritesAttributes({@WritesAttribute(attribute = ".", description = "The extracted content metadata " ++ "will be inserted with the attribute name \".\", or \"\" if " ++ "\"Metadata Key Prefix\" is not provided.")}) +@SupportsBatching +public class ExtractMediaMetadata extends AbstractProcessor { + +static final PropertyDescriptor MAX_NUMBER_OF_ATTRIBUTES = new PropertyDescriptor.Builder() +.name("Max Number of Attributes") +.description("Specify the max number of attributes to add to the flowfile. There is no guarantee in what order" ++ " the tags will be processed. By default it will process all of them.") +.
[GitHub] nifi pull request #556: NIFI-615 - Create a processor to extract WAV file ch...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/556#discussion_r67969885 --- Diff: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/test/java/org/apache/nifi/processors/media/TestExtractMediaMetadata.java --- @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.media; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestExtractMediaMetadata { + +@Test +public void testProperties() { +final TestRunner runner = TestRunners.newTestRunner(new ExtractMediaMetadata()); +ProcessContext context = runner.getProcessContext(); +Map<PropertyDescriptor, String> propertyValues = context.getProperties(); +assertEquals(6, propertyValues.size()); +} + +@Test +public void testRelationships() { +final TestRunner runner = TestRunners.newTestRunner(new ExtractMediaMetadata()); +ProcessContext context = runner.getProcessContext(); +Set relationships = context.getAvailableRelationships(); +assertEquals(2, relationships.size()); +assertTrue(relationships.contains(ExtractMediaMetadata.SUCCESS)); +assertTrue(relationships.contains(ExtractMediaMetadata.FAILURE)); +} + +@Test +public void testTextBytes() throws IOException { +final TestRunner runner = TestRunners.newTestRunner(new ExtractMediaMetadata()); +runner.setProperty(ExtractMediaMetadata.MIME_TYPE_FILTER, "text/.*"); +runner.setProperty(ExtractMediaMetadata.METADATA_KEY_FILTER, ""); +runner.setProperty(ExtractMediaMetadata.METADATA_KEY_PREFIX, "txt."); +runner.assertValid(); + +final Map<String, String> attrs = new HashMap<>(); +attrs.put("filename", "test1.txt"); +runner.enqueue("test1".getBytes(), attrs); +runner.run(); + +runner.assertAllFlowFilesTransferred(ExtractMediaMetadata.SUCCESS, 1); +runner.assertTransferCount(ExtractMediaMetadata.FAILURE, 0); + +final List successFiles = runner.getFlowFilesForRelationship(ExtractMediaMetadata.SUCCESS); +MockFlowFile flowFile0 = successFiles.get(0); +flowFile0.assertAttributeExists("filename"); +flowFile0.assertAttributeEquals("filename", "test1.txt"); +flowFile0.assertAttributeExists("txt.Content-Type"); + assertTrue(flowFile0.getAttribute("txt.Content-Type").startsWith("text/plain")); +flowFile0.assertAttributeExists("txt.X-Parsed-By"); + assertTrue(flowFile0.getAttribute("txt.X-Parsed-By").contains("org.apache.tika.parser.DefaultParser")); + assertTrue(flowFile0.getAttribute("txt.X-Parsed-By").contains("org.apache.tika.parser.txt.TXTParser")); +flowFile0.assertAttributeExists("txt.Content-Encoding"); +flowFile0.assertAttributeEquals("txt.Content-Encoding", "ISO-8859-1"); +flowFile0.assertContentEquals("test1".getBytes("UTF-8")); +
[GitHub] nifi pull request #556: NIFI-615 - Create a processor to extract WAV file ch...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/556#discussion_r67964066 --- Diff: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/media/ExtractMediaMetadata.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.media; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.ObjectHolder; + +import org.apache.tika.exception.TikaException; +import org.apache.tika.io.TikaInputStream; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.parser.AutoDetectParser; +import org.apache.tika.sax.BodyContentHandler; +import org.xml.sax.SAXException; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"media", "file", "format", "metadata", "audio", "video", "image", "document", "pdf"}) +@CapabilityDescription("Extract the content metadata from flowfiles containing audio, video, image, and other file " ++ "types. This processor relies on the Apache Tika project for file format detection and parsing. It " ++ "extracts a long list of metadata types for media files including audio, video, and print media " ++ "formats." ++ "For the more details and the list of supported file types, visit the library's website " ++ "at http://tika.apache.org/.;) +@WritesAttributes({@WritesAttribute(attribute = ".", description = "The extracted content metadata " --- End diff -- Here it has "." making it seem like there is automatically a "." added but in the property descriptor it says the "." or "-" is not automatically added. I agree that we shouldn't lock the user into using ".", "-", etc. so this should be changed to reflect that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #556: NIFI-615 - Create a processor to extract WAV file ch...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/556#discussion_r67963557 --- Diff: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/media/ExtractMediaMetadata.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.media; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.ObjectHolder; + +import org.apache.tika.exception.TikaException; +import org.apache.tika.io.TikaInputStream; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.parser.AutoDetectParser; +import org.apache.tika.sax.BodyContentHandler; +import org.xml.sax.SAXException; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"media", "file", "format", "metadata", "audio", "video", "image", "document", "pdf"}) +@CapabilityDescription("Extract the content metadata from flowfiles containing audio, video, image, and other file " ++ "types. This processor relies on the Apache Tika project for file format detection and parsing. It " ++ "extracts a long list of metadata types for media files including audio, video, and print media " ++ "formats." ++ "For the more details and the list of supported file types, visit the library's website " ++ "at http://tika.apache.org/.;) +@WritesAttributes({@WritesAttribute(attribute = ".", description = "The extracted content metadata " ++ "will be inserted with the attribute name \".\", or \"\" if " ++ "\"Metadata Key Prefix\" is not provided.")}) +@SupportsBatching +public class ExtractMediaMetadata extends AbstractProcessor { + +static final PropertyDescriptor MAX_NUMBER_OF_ATTRIBUTES = new PropertyDescriptor.Builder() +.name("Max Number of Attributes") +.description("Specify the max number of attributes to add to the flowfile. There is no guarantee in what order" ++ " the tags will be processed. By default it will process all of them.") +.
[GitHub] nifi pull request #556: NIFI-615 - Create a processor to extract WAV file ch...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/556#discussion_r67963320 --- Diff: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/media/ExtractMediaMetadata.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.media; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.ObjectHolder; + +import org.apache.tika.exception.TikaException; +import org.apache.tika.io.TikaInputStream; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.parser.AutoDetectParser; +import org.apache.tika.sax.BodyContentHandler; +import org.xml.sax.SAXException; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"media", "file", "format", "metadata", "audio", "video", "image", "document", "pdf"}) +@CapabilityDescription("Extract the content metadata from flowfiles containing audio, video, image, and other file " ++ "types. This processor relies on the Apache Tika project for file format detection and parsing. It " ++ "extracts a long list of metadata types for media files including audio, video, and print media " ++ "formats." ++ "For the more details and the list of supported file types, visit the library's website " ++ "at http://tika.apache.org/.;) +@WritesAttributes({@WritesAttribute(attribute = ".", description = "The extracted content metadata " ++ "will be inserted with the attribute name \".\", or \"\" if " ++ "\"Metadata Key Prefix\" is not provided.")}) +@SupportsBatching +public class ExtractMediaMetadata extends AbstractProcessor { + +static final PropertyDescriptor MAX_NUMBER_OF_ATTRIBUTES = new PropertyDescriptor.Builder() +.name("Max Number of Attributes") +.description("Specify the max number of attributes to add to the flowfile. There is no guarantee in what order" ++ " the tags will be processed. By default it will process all of them.") +.
[GitHub] nifi pull request #556: NIFI-615 - Create a processor to extract WAV file ch...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/556#discussion_r67963044 --- Diff: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/media/ExtractMediaMetadata.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.media; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.ObjectHolder; + +import org.apache.tika.exception.TikaException; +import org.apache.tika.io.TikaInputStream; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.parser.AutoDetectParser; +import org.apache.tika.sax.BodyContentHandler; +import org.xml.sax.SAXException; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"media", "file", "format", "metadata", "audio", "video", "image", "document", "pdf"}) +@CapabilityDescription("Extract the content metadata from flowfiles containing audio, video, image, and other file " ++ "types. This processor relies on the Apache Tika project for file format detection and parsing. It " ++ "extracts a long list of metadata types for media files including audio, video, and print media " ++ "formats." ++ "For the more details and the list of supported file types, visit the library's website " ++ "at http://tika.apache.org/.;) +@WritesAttributes({@WritesAttribute(attribute = ".", description = "The extracted content metadata " ++ "will be inserted with the attribute name \".\", or \"\" if " ++ "\"Metadata Key Prefix\" is not provided.")}) +@SupportsBatching +public class ExtractMediaMetadata extends AbstractProcessor { + +static final PropertyDescriptor MAX_NUMBER_OF_ATTRIBUTES = new PropertyDescriptor.Builder() +.name("Max Number of Attributes") +.description("Specify the max number of attributes to add to the flowfile. There is no guarantee in what order" ++ " the tags will be processed. By default it will process all of them.") +.
[GitHub] nifi pull request #548: NIFI-2045 - Removing mqtt message from queue after a...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/548#discussion_r67957184 --- Diff: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java --- @@ -288,9 +289,13 @@ public void process(final OutputStream out) throws IOException { String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString(); session.getProvenanceReporter().receive(messageFlowfile, transitUri); +String uuid = messageFlowfile.getAttribute(CoreAttributes.UUID.key()); session.transfer(messageFlowfile, REL_MESSAGE); -mqttQueue.remove(mqttMessage); session.commit(); +if (!mqttQueue.remove(mqttMessage)) { --- End diff -- In order to avoid concatenating the Strings in the logger message needlessly, this should check if warn is enabled too: `... && getLogger().isWarnEnabled()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #548: NIFI-2045 - Removing mqtt message from queue after a...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/548#discussion_r67957067 --- Diff: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java --- @@ -288,9 +289,13 @@ public void process(final OutputStream out) throws IOException { String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString(); session.getProvenanceReporter().receive(messageFlowfile, transitUri); +String uuid = messageFlowfile.getAttribute(CoreAttributes.UUID.key()); --- End diff -- A bit of a nit pick, this is only referenced in the logger statement below so it should be created in there, not outside of it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #555: NIFI-2067 ignored intermittently failing MemoryTest
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/555 Nope I am missing it, sorry about that. Once the travis build finishes successfully I am +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #555: NIFI-2067 ignored intermittently failing MemoryTest
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/555 In cases where validateWarnWhenPercentThresholdReached fails would validateWarnWhenSizeThresholdReached fail too? Wondering if we should ignore both. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #543: NIFI-1834 Create PutTCP Processor
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/543 Also, for the sake of transparency, we had a discussion offline debating the various directions to go to handle the "consistent inconsistency" and had the idea for a way for the user to check a box in the UI for whether to enter literal or interpreted text. I created a ticket for it here[1]. For now, in order to keep consistent with the other processors in this area that do automatically convert, the PutTCP processor will convert a "\n" entered in the UI to a new line character (also tabs and carriage returns). [1] https://issues.apache.org/jira/browse/NIFI-2069 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #543: NIFI-1834 Create PutTCP Processor
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/543 Thanks @olegz, will merge it in --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #543: NIFI-1834 Create PutTCP Processor
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/543#discussion_r67801359 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; +import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StopWatch; + +import javax.net.ssl.SSLContext; + +/** + * + * The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP + * connection (or pool of TCP connections if multiple input threads are configured). To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can + * be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specified to change the + * behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile is received and closed after the FlowFile has been sent. This option should + * only be used for low message volume scenarios, otherwise the platform may run out of TCP sockets. + * + * + * + * This processor has the following required properties: + * + * Hostname - The IP address or host name of the destination TCP server. + * Port - The TCP port of the destination TCP server. + * + * + * + * + * This processor has the following optional properties: + * + * Connection Per FlowFile - Specifies that each FlowFiles content will be transmitted on a separate TCP connection. + * Idle Connection Expiration - The time threshold after which a TCP sender is deemed eligible for pruning - the associated TCP connection will be closed after this timeout. + * Max Size of Socket Send Buffer - The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should + * be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped. + * Outgoing Message Delimiter - A string to append to the end of each FlowFiles content to indicate the end of the message to the TCP server. + * Timeout - The timeout period for determining an error has occurred whilst connecting or sending data. + * + * + * + * + * The following relationships are required: + *
[GitHub] nifi pull request #543: NIFI-1834 Create PutTCP Processor
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/543#discussion_r67800576 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; +import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StopWatch; + +import javax.net.ssl.SSLContext; + +/** + * + * The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP + * connection (or pool of TCP connections if multiple input threads are configured). To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can + * be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specified to change the + * behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile is received and closed after the FlowFile has been sent. This option should + * only be used for low message volume scenarios, otherwise the platform may run out of TCP sockets. + * + * + * + * This processor has the following required properties: + * + * Hostname - The IP address or host name of the destination TCP server. + * Port - The TCP port of the destination TCP server. + * + * + * + * + * This processor has the following optional properties: + * + * Connection Per FlowFile - Specifies that each FlowFiles content will be transmitted on a separate TCP connection. + * Idle Connection Expiration - The time threshold after which a TCP sender is deemed eligible for pruning - the associated TCP connection will be closed after this timeout. + * Max Size of Socket Send Buffer - The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should + * be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped. + * Outgoing Message Delimiter - A string to append to the end of each FlowFiles content to indicate the end of the message to the TCP server. + * Timeout - The timeout period for determining an error has occurred whilst connecting or sending data. + * + * + * + * + * The following relationships are required: + *
[GitHub] nifi pull request #543: NIFI-1834 Create PutTCP Processor
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/543#discussion_r67799164 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; +import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StopWatch; + +import javax.net.ssl.SSLContext; + +/** + * + * The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP + * connection (or pool of TCP connections if multiple input threads are configured). To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can + * be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specified to change the + * behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile is received and closed after the FlowFile has been sent. This option should + * only be used for low message volume scenarios, otherwise the platform may run out of TCP sockets. + * + * + * + * This processor has the following required properties: + * + * Hostname - The IP address or host name of the destination TCP server. + * Port - The TCP port of the destination TCP server. + * + * + * + * + * This processor has the following optional properties: + * + * Connection Per FlowFile - Specifies that each FlowFiles content will be transmitted on a separate TCP connection. + * Idle Connection Expiration - The time threshold after which a TCP sender is deemed eligible for pruning - the associated TCP connection will be closed after this timeout. + * Max Size of Socket Send Buffer - The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should + * be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped. + * Outgoing Message Delimiter - A string to append to the end of each FlowFiles content to indicate the end of the message to the TCP server. + * Timeout - The timeout period for determining an error has occurred whilst connecting or sending data. + * + * + * + * + * The following relationships are required: + *
[GitHub] nifi issue #252: NIFI-615 - Create a processor to extract WAV file character...
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/252 @jskora were you able to rebase and open a new PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #543: NIFI-1834 Create PutTCP Processor
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/543#discussion_r67782966 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; +import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StopWatch; + +import javax.net.ssl.SSLContext; + +/** + * + * The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP + * connection (or pool of TCP connections if multiple input threads are configured). To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can + * be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specified to change the + * behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile is received and closed after the FlowFile has been sent. This option should + * only be used for low message volume scenarios, otherwise the platform may run out of TCP sockets. + * + * + * + * This processor has the following required properties: + * + * Hostname - The IP address or host name of the destination TCP server. + * Port - The TCP port of the destination TCP server. + * + * + * + * + * This processor has the following optional properties: + * + * Connection Per FlowFile - Specifies that each FlowFiles content will be transmitted on a separate TCP connection. + * Idle Connection Expiration - The time threshold after which a TCP sender is deemed eligible for pruning - the associated TCP connection will be closed after this timeout. + * Max Size of Socket Send Buffer - The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should + * be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped. + * Outgoing Message Delimiter - A string to append to the end of each FlowFiles content to indicate the end of the message to the TCP server. + * Timeout - The timeout period for determining an error has occurred whilst connecting or sending data. + * + * + * + * + * The following relationships are required: + *
[GitHub] nifi pull request #543: NIFI-1834 Create PutTCP Processor
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/543#discussion_r67782856 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; +import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StopWatch; + +import javax.net.ssl.SSLContext; + +/** + * + * The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP + * connection (or pool of TCP connections if multiple input threads are configured). To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can + * be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specified to change the + * behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile is received and closed after the FlowFile has been sent. This option should + * only be used for low message volume scenarios, otherwise the platform may run out of TCP sockets. + * + * + * + * This processor has the following required properties: + * + * Hostname - The IP address or host name of the destination TCP server. + * Port - The TCP port of the destination TCP server. + * + * + * + * + * This processor has the following optional properties: + * + * Connection Per FlowFile - Specifies that each FlowFiles content will be transmitted on a separate TCP connection. + * Idle Connection Expiration - The time threshold after which a TCP sender is deemed eligible for pruning - the associated TCP connection will be closed after this timeout. + * Max Size of Socket Send Buffer - The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should + * be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped. + * Outgoing Message Delimiter - A string to append to the end of each FlowFiles content to indicate the end of the message to the TCP server. + * Timeout - The timeout period for determining an error has occurred whilst connecting or sending data. + * + * + * + * + * The following relationships are required: + *
[GitHub] nifi pull request #543: NIFI-1834 Create PutTCP Processor
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/543#discussion_r6337 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java --- @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.processors.standard.util.TestPutTCPCommon; + +public class TestPutTCP extends TestPutTCPCommon { + +public TestPutTCP() { +super(); +ssl = false; +} + +@Override +public void configureProperties(String host, int port, String outgoingMessageDelimiter, boolean connectionPerFlowFile, boolean expectValid) { +runner.setProperty(PutTCP.HOSTNAME, host); +runner.setProperty(PutTCP.PORT, Integer.toString(port)); +if (outgoingMessageDelimiter != null) { +runner.setProperty(PutTCP.OUTGOING_MESSAGE_DELIMITER, outgoingMessageDelimiter); +} +if (connectionPerFlowFile) { +runner.setProperty(PutTCP.CONNECTION_PER_FLOWFILE, "true"); +} else { +runner.setProperty(PutTCP.CONNECTION_PER_FLOWFILE, "false"); +} --- End diff -- In general, I like the IF/ELSE block better but actually I think neither are appropriate here and it would better to just do `String.valueOf(connectionPerFlowFile)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #543: NIFI-1834 Create PutTCP Processor
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/543#discussion_r67776824 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; +import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StopWatch; + +import javax.net.ssl.SSLContext; + +/** + * + * The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP + * connection (or pool of TCP connections if multiple input threads are configured). To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can + * be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specified to change the + * behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile is received and closed after the FlowFile has been sent. This option should + * only be used for low message volume scenarios, otherwise the platform may run out of TCP sockets. + * + * + * + * This processor has the following required properties: + * + * Hostname - The IP address or host name of the destination TCP server. + * Port - The TCP port of the destination TCP server. + * + * + * + * + * This processor has the following optional properties: + * + * Connection Per FlowFile - Specifies that each FlowFiles content will be transmitted on a separate TCP connection. + * Idle Connection Expiration - The time threshold after which a TCP sender is deemed eligible for pruning - the associated TCP connection will be closed after this timeout. + * Max Size of Socket Send Buffer - The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should + * be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped. + * Outgoing Message Delimiter - A string to append to the end of each FlowFiles content to indicate the end of the message to the TCP server. + * Timeout - The timeout period for determining an error has occurred whilst connecting or sending data. + * + * + * + * + * The following relationships are required: + *
[GitHub] nifi issue #542: NIFI-1895 for 0.x
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/542 Specifically looking at Bryan's commit: I looked through the code and built locally (since travis failed for un-related issues) and it looks good. +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #542: NIFI-1895 for 0.x
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/542#discussion_r67723205 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java --- @@ -89,6 +89,25 @@ .defaultValue(COMPLEX_FIELD_TEXT.getValue()) .build(); +protected static final String STRING_ENCODING_VALUE = "String"; +protected static final String BYTES_ENCODING_VALUE = "Bytes"; + +protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, +"Stores the value of each field as a UTF-8 String."); --- End diff -- That works for me. That's fair, if questions come in to the mailing list asking to clarify then we can address the description. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #543: NIFI-1834 Create PutTCP Processor
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/543#discussion_r67722269 --- Diff: nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java --- @@ -119,7 +119,24 @@ .defaultValue("10 seconds") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); - +public static final PropertyDescriptor OUTGOING_MESSAGE_DELIMITER = new PropertyDescriptor.Builder() +.name("Outgoing Message Delimiter") +.description("Specifies the delimiter to use when sending messages out over the same TCP stream. The delimiter is appended to each FlowFile message " ++ "that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. Users should " ++ "ensure that the FlowFile content does not contain the delimiter character to avoid errors. If it is not possible to define a delimiter " ++ "character that is not present in the FlowFile content then the user must use another processor to change the encoding of the data e.g. Base64 " ++ "encoding.") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(true) +.build(); +public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new PropertyDescriptor.Builder() --- End diff -- While I'm not sure about use-cases I don't see a problem adding it as a property which defaults to false. I assume @aviewfromspace1 had a specific use-case in mind when adding the property and since it is optional I like keeping it. Also PutTCP is marked as "TriggerWhenEmpty" so it will properly pruneIdleSenders just like PutSplunk. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #542: NIFI-1895 for 0.x
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/542#discussion_r67714535 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java --- @@ -89,6 +89,25 @@ .defaultValue(COMPLEX_FIELD_TEXT.getValue()) .build(); +protected static final String STRING_ENCODING_VALUE = "String"; +protected static final String BYTES_ENCODING_VALUE = "Bytes"; + +protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, +"Stores the value of each field as a UTF-8 String."); --- End diff -- Secondly, will users configuring this processor know what a "String" is (aka. is it a concept that is used regularly when configuring HBase)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #542: NIFI-1895 for 0.x
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/542#discussion_r67713339 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java --- @@ -89,6 +89,25 @@ .defaultValue(COMPLEX_FIELD_TEXT.getValue()) .build(); +protected static final String STRING_ENCODING_VALUE = "String"; +protected static final String BYTES_ENCODING_VALUE = "Bytes"; + +protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, +"Stores the value of each field as a UTF-8 String."); --- End diff -- Should the charset be selectable or will it always be UTF-8? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #543: NIFI-1834 Create PutTCP Processor
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/543#discussion_r67707758 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java --- @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; +import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.stream.io.StreamUtils; + +/** + * + * The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP + * connection (or pool of TCP connections if multiple input threads are configured). To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can + * be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specified to change the + * behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile is received and closed after the FlowFile has been sent. This option should + * only be used for low message volume scenarios, otherwise the platform may run out of TCP sockets. + * + * + * + * This processor has the following required properties: + * + * Hostname - The IP address or host name of the destination TCP server. + * Port - The TCP port of the destination TCP server. + * + * + * + * + * This processor has the following optional properties: + * + * Connection Per FlowFile - Specifies that each FlowFiles content will be transmitted on a separate TCP connection. + * Idle Connection Expiration - The time threshold after which a TCP sender is deemed eligible for pruning - the associated TCP connection will be closed after this timeout. + * Max Size of Socket Send Buffer - The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should + * be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped. + * Outgoing Message Delimiter/b> - A string to append to the end of each FlowFiles content to indicate the end of the message to the TCP server. + * Timeout - The timeout period for determining an error has occurred whilst connecting or sending data. + * + * + * + * + * The following relationships are required: + * + * failure - Where to route FlowFiles that failed to be sent. + * success - Where to route FlowFiles after the
[GitHub] nifi pull request #543: NIFI-1834 Create PutTCP Processor
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/543#discussion_r67707310 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java --- @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; +import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.stream.io.StreamUtils; + +/** + * + * The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP + * connection (or pool of TCP connections if multiple input threads are configured). To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can + * be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specified to change the + * behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile is received and closed after the FlowFile has been sent. This option should + * only be used for low message volume scenarios, otherwise the platform may run out of TCP sockets. + * + * + * + * This processor has the following required properties: + * + * Hostname - The IP address or host name of the destination TCP server. + * Port - The TCP port of the destination TCP server. + * + * + * + * + * This processor has the following optional properties: + * + * Connection Per FlowFile - Specifies that each FlowFiles content will be transmitted on a separate TCP connection. + * Idle Connection Expiration - The time threshold after which a TCP sender is deemed eligible for pruning - the associated TCP connection will be closed after this timeout. + * Max Size of Socket Send Buffer - The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should + * be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped. + * Outgoing Message Delimiter/b> - A string to append to the end of each FlowFiles content to indicate the end of the message to the TCP server. + * Timeout - The timeout period for determining an error has occurred whilst connecting or sending data. + * + * + * + * + * The following relationships are required: + * + * failure - Where to route FlowFiles that failed to be sent. + * success - Where to route FlowFiles after the
[GitHub] nifi issue #272: NIFI-1620 Allow empty Content-Type in InvokeHTTP processor
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/272 Thanks for the assist @apiri --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi issue #543: NIFI-1834 Create PutTCP Processor
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/543 I agree with @joewitt --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---