[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-197151921 @mattyb149 has it. @mans2singh the updates look great. Concerning the validator, how does 6bed3bcaad61c516eacf268400cdd03c0cd58dae look to you? Otherwise, I think we can send this one on its way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: quick reminder on new nars
Would certainly like to better understand what you have in mind. thanks On Wed, Mar 16, 2016 at 12:02 AM, Sean Busbeywrote: > we could make a parent pom for all the nar modules. > > wanna see what that looks like? > > On Tue, Mar 15, 2016 at 8:46 PM, Joe Witt wrote: >> Team, >> >> During the previous build/release cycle it was found that >> javadocs/sources were being made for the Nar bundles themselves and >> was causing invalid licensing/notice information to be present. All >> the existing bundles and the archetypes were fixed for this. Just be >> sure on new nars to include these as well if you aren't copying from >> something existing or using the archetype. I just fixed a couple of >> them for new things in the 0.6.0 release. >> >> The nar pom itself should have a properties section such as >> >> >> true >> true >> >> >> Perhaps there is a nicer maven way of ensuring this doesn't happen for Nars. >> >> Thanks >> Joe
Re: quick reminder on new nars
we could make a parent pom for all the nar modules. wanna see what that looks like? On Tue, Mar 15, 2016 at 8:46 PM, Joe Wittwrote: > Team, > > During the previous build/release cycle it was found that > javadocs/sources were being made for the Nar bundles themselves and > was causing invalid licensing/notice information to be present. All > the existing bundles and the archetypes were fixed for this. Just be > sure on new nars to include these as well if you aren't copying from > something existing or using the archetype. I just fixed a couple of > them for new things in the 0.6.0 release. > > The nar pom itself should have a properties section such as > > > true > true > > > Perhaps there is a nicer maven way of ensuring this doesn't happen for Nars. > > Thanks > Joe
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56276158 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java --- @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.InvalidTypeException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.QueryExecutionException; +import com.datastax.driver.core.exceptions.QueryValidationException; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@SupportsBatching +@Tags({"cassandra", "cql", "put", "insert", "update", "set"}) +@EventDriven +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Execute provided Cassandra Query Language (CQL) statement. The content of an incoming FlowFile " ++ "is expected to be the CQL command to execute. The CQL command may use the ? to escape parameters. In this " ++ "case, the parameters to use must exist as FlowFile attributes with the naming convention cql.args.N.type " ++ "and cql.args.N.value, where N is a positive integer. The cql.args.N.type is expected to be " ++ "a lowercase string indicating the Cassandra type.") +@ReadsAttributes({ +@ReadsAttribute(attribute = "cql.args.N.type", +description = "Incoming FlowFiles are expected to be parameterized CQL statements. The type of each " ++ "parameter is specified as a lowercase string corresponding to the Cassandra data type (text, " ++ "int, boolean, e.g.). In the case of collections, the primitive type(s) of the elements in the " ++ "collection should be comma-delimited, follow the collection type, and be enclosed in
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user mattyb149 commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-197125258 @mans2singh I think Aldrin was referring to DATA_SIZE_VALIDATOR in org.apache.nifi.processor.util.StandardValidators. If you need custom min/max size (in bytes), you can use StandardValidators.createDataSizeBoundsValidator(minBytesInclusive, maxBytesInclusive) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user mans2singh commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-197122905 @aldrin - I've moved the check for buffer size and rebased as you had recommended. I could not find the data size validator that you have mentioned though. Please let me know if you have any other recommendation. Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...
Github user olegz commented on the pull request: https://github.com/apache/nifi/pull/271#issuecomment-197121747 @trkurc Without knowing how much spring experience you have it's hard to say, but I do have a sample app which I use for testing that I can push to my github and you (anyone) can use to play around. Will do it tomorrow after some other pressing things. Also, I plan to add one minor feature. Will describe it in the next commit message as well as updated docs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...
Github user trkurc commented on the pull request: https://github.com/apache/nifi/pull/271#issuecomment-197119997 @oleg: changes look good. do you have a recommendation for building a sample "application" for testing other than adapting what is in the unit tests? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1575: Add QueryDatabaseTable processor
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/261#discussion_r56274269 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java --- @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.util.LongHolder; +import org.apache.nifi.util.StopWatch; + +import java.io.IOException; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.text.DecimalFormat; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.sql.Types.ARRAY; +import static java.sql.Types.BIGINT; +import static java.sql.Types.BINARY; +import static java.sql.Types.BIT; +import static java.sql.Types.BLOB; +import static java.sql.Types.BOOLEAN; +import static java.sql.Types.CHAR; +import static java.sql.Types.CLOB; +import static java.sql.Types.DATE; +import static java.sql.Types.DECIMAL; +import static java.sql.Types.DOUBLE; +import static java.sql.Types.FLOAT; +import static java.sql.Types.INTEGER; +import static java.sql.Types.LONGNVARCHAR; +import static java.sql.Types.LONGVARBINARY; +import static java.sql.Types.LONGVARCHAR; +import static java.sql.Types.NCHAR; +import static java.sql.Types.NUMERIC; +import static java.sql.Types.NVARCHAR; +import static java.sql.Types.REAL; +import static java.sql.Types.ROWID; +import static java.sql.Types.SMALLINT; +import static java.sql.Types.TIME; +import static java.sql.Types.TIMESTAMP; +import static java.sql.Types.TINYINT; +import static java.sql.Types.VARBINARY; +import static java.sql.Types.VARCHAR; + +@EventDriven +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "select", "jdbc", "query", "database"}) +@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." ++ " Streaming is used so arbitrarily large result sets are supported. This
quick reminder on new nars
Team, During the previous build/release cycle it was found that javadocs/sources were being made for the Nar bundles themselves and was causing invalid licensing/notice information to be present. All the existing bundles and the archetypes were fixed for this. Just be sure on new nars to include these as well if you aren't copying from something existing or using the archetype. I just fixed a couple of them for new things in the 0.6.0 release. The nar pom itself should have a properties section such as true true Perhaps there is a nicer maven way of ensuring this doesn't happen for Nars. Thanks Joe
Re: [ANNOUNCE] New Apache NiFi Committer Oleg Zhurakousky
Congrats Oleg! Sent from my iPhone > On Mar 15, 2016, at 9:01 PM, Ricky Saltzerwrote: > > Big congrats! >> On Mar 15, 2016 8:42 PM, "Michael Moser" wrote: >> >> Yes, welcome Oleg! Your hard work is very much appreciated by everyone. >> >> -- Mike >> >> >>> On Tue, Mar 15, 2016 at 8:22 PM, Matt Burgess wrote: >>> >>> Congratulations! Well deserved. >>> On Mar 15, 2016, at 8:17 PM, Tony Kurc wrote: On behalf of the Apache NiFI PMC, I am very pleased to announce that >> Oleg Zhurakousky has accepted the PMC's invitation to become a committer on >>> the Apache NiFi project. We greatly appreciate all of Oleg's hard work and generous contributions to the project. We look forward to his continued involvement in the project. Some highlights of Oleg's contributions include identifying and >>> mitigating some challenging framework bugs, amqp support in 0.5.x, and some pretty exciting spring support slated for 0.6.x. As I have noticed, many of >> you have already had the opportunity to interact with Oleg on the mailing lists, the jiras, and pull request comments. I, for one, look forward >> to the continued community support! Welcome and congratulations! Tony >>
Re: Rollbacks
Devin, So the session follows the unit of work pattern that Martin Fowler describes [1]. The idea here is that whether you access one flow file or many flow files or anything in between you are doing some session of work. You can commit all the things you do on that session or rollback all the things you do on that session. This pattern/concept provides a nice and safe environment in which the contract between the developer and framework is well understood. Generally, rollback is only necessary when some unplanned or unexpected exception occurs and it is largely there so that the framework can ensure all things are returned to a safe state. It can also do things like penalize that processor/extension so that if it is some programming error that it will reduce its impact on the system overall. So, with that said there are two ways to think about your case. It appears you are doing your own batching and probably this is for higher throughput and also it appears you'd really like to treat each flow file independently in terms of logic/handling. This is precisely why in addition to this nice clean unit of work pattern we also support automated session batching (this is what Andrew was referring to). In this mode you can add an annotation to your processor called @SupportsBatching which signals to the framework that it may attempt to automatically combine subsequent calls to commit into small batches and commit them in a single batch. In this way you can build your processor in a very simple single flow file sort of manner and call commit. But the framework will combine a series of commits in a very small time window together to get higher throughput. In the UI a user can signal their willingness to let the framework to do this and acknowledge that they may be trading off some small latency in favor of higher throughput. Now there are some additional things to think about when using this. For instance, it is best used when the processor and its function is side effect free meaning that there are no external system state changes or things like that. In this sense you can think of the processor you're building as idempotent (as the REST folks like to say). If your processor fits that then SupportsBatching can have really powerful results. Now, you also mention that some flow files you'd consider failures and others you'd consider something else, presumably success. This is perfect and very common and does not require a rollback. Keep rollback in mind for bad stuff that can happen that you don't plan for. For the scenario of failures that you can predict such as invalid data or invalid state of something you actually want to have a failure relationship on that processor and simply route things there. >From a 'developer' perspective this is not a rollback case. "Failure" then is as planned for and expected as "success". So you go ahead and route the flowfile to failure and call commit. All good. It is the person designing this loosely coupled and highly cohesive set of components together in a flow that gets to decide what failure means for their context. Lots of info here and probably not well written or with big gaps. You're asking the right questions so just keep asking. Lots of folks here that want to help. [1] http://martinfowler.com/eaaCatalog/unitOfWork.html [2] https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#session-rollback Thanks Joe On Tue, Mar 15, 2016 at 7:25 PM, Devin Fisherwrote: > Thanks for your reply. I'm sorry if my question seems confusing. I'm still > learning how nifi works. I don't have any understand about how the > framework works on the back end and incomplete understanding of the exposed > interface. From my point view (an external process developer) asking to > rollback the one flow file that failed (I don't want changes made to it > incompletely) and lets the other n flowfiles move on seems reasonable. But > I don't know what is happening in the session on the back end. > > I likely don't really understand what happens on a rollback. Reading the > developer's guide I got the impression that rollback disregards all changes > made the session include transfers. It then returns the flowfiles to the > queue. It would seem that a session is really finished and not usable after > a rollback. So, I then don't understand how I can do my use case. I want to > rollback (undo changes to a single flow file that failed) and then transfer > it to the Failed relationship unchanged (or add the discard.reason to the > attributes). > > I assume you mean "Run duration" when you refer to the 'scheduling' tab. I > would love to understand better how that works. In the documentation, I > only see a note about it in the User guide. But the developer's guide is > silent. I don't see how that slider is enforced in the processor code. It > seems that once the framework has ceded control to the processor it can run > for as
Re: Cross NAR Controller Services
Thanks Bryan. That should give me plenty to work on tomorrow. I'll write back if I can't figure it out. Devin On Tue, Mar 15, 2016 at 6:11 PM, Bryan Bendewrote: > Devin, > > This WIki page shows how to create the appropriate dependencies between > your NAR and the ControllerService: > > > https://cwiki.apache.org/confluence/display/NIFI/Maven+Projects+for+Extensions#MavenProjectsforExtensions-LinkingProcessorsandControllerServices > > I also created an example project on GitHub to show a working example: > https://github.com/bbende/nifi-dependency-example > > Hope that helps. > > -Bryan > > On Tue, Mar 15, 2016 at 7:33 PM, Oleg Zhurakousky < > ozhurakou...@hortonworks.com> wrote: > > > Devin > > > > Your problem is most likely in your NAR poms where you may satisfy > compile > > dependency but not NAR to participate in class loader runtime > inheritance. > > Is there a way to look at your poms and also the general structure of the > > project? > > > > Oleg > > > > Sent from my iPhone > > > > > On Mar 15, 2016, at 18:51, Devin Fisher < > > devin.fis...@perfectsearchcorp.com> wrote: > > > > > > I'm having issues using a standard controller service > > (DBCPConnectionPool) > > > that is provided by nifi-dbcp-service-nar. But I'm having issues with > my > > > nar. I have included a dependency on nifi-dbcp-service-api in my maven > > pom > > > and have used the property description that is the same as ExecuteSQL > for > > > the DBCP_SERVICE property. When I load my processor in nifi I don't > get a > > > list of DBCPConnectionPool controller service like I expect. I have an > > > ExecuteSQL processor in the same flow (for testing) and it list the > > > controller service I created just fine and uses it just fine. > > > > > > The problem seems to me (I don't have a development environment to > > confirm) > > > that the DBCPService.class that I use in my processor is not seen as > the > > > same class object (because of the isolation features of NAR) as the one > > > that DBCPCOnnectionPool implements. I think I have mostly confirmed > this > > by > > > implementing a dummy controller service that implements DBCPService in > > the > > > same NAR as my processor and my processor is able to list it just fine. > > But > > > the ExecuteSQL don't list my dummy controller service. So they seem to > be > > > considered different classes. > > > > > > I think I'm doing something wrong because ExecuteSQL is not in the same > > nar > > > as DBCPConnectionPool. So they play nice together somehow but I don't > see > > > what I need to do so that my nar works the same way. > > > > > > I'm enjoying developing against nifi and sorry if this is a rookie > > mistake. > > > > > > Devin > > >
Re: [ANNOUNCE] New Apache NiFi Committer Oleg Zhurakousky
Big congrats! On Mar 15, 2016 8:42 PM, "Michael Moser"wrote: > Yes, welcome Oleg! Your hard work is very much appreciated by everyone. > > -- Mike > > > On Tue, Mar 15, 2016 at 8:22 PM, Matt Burgess wrote: > > > Congratulations! Well deserved. > > > > > On Mar 15, 2016, at 8:17 PM, Tony Kurc wrote: > > > > > > On behalf of the Apache NiFI PMC, I am very pleased to announce that > Oleg > > > Zhurakousky has accepted the PMC's invitation to become a committer on > > the > > > Apache NiFi project. We greatly appreciate all of Oleg's hard work and > > > generous contributions to the project. We look forward to his continued > > > involvement in the project. > > > > > > Some highlights of Oleg's contributions include identifying and > > mitigating > > > some challenging framework bugs, amqp support in 0.5.x, and some pretty > > > exciting spring support slated for 0.6.x. As I have noticed, many of > you > > > have already had the opportunity to interact with Oleg on the mailing > > > lists, the jiras, and pull request comments. I, for one, look forward > to > > > the continued community support! > > > > > > Welcome and congratulations! > > > > > > Tony > > >
Re: [ANNOUNCE] New Apache NiFi Committer Oleg Zhurakousky
Yes, welcome Oleg! Your hard work is very much appreciated by everyone. -- Mike On Tue, Mar 15, 2016 at 8:22 PM, Matt Burgesswrote: > Congratulations! Well deserved. > > > On Mar 15, 2016, at 8:17 PM, Tony Kurc wrote: > > > > On behalf of the Apache NiFI PMC, I am very pleased to announce that Oleg > > Zhurakousky has accepted the PMC's invitation to become a committer on > the > > Apache NiFi project. We greatly appreciate all of Oleg's hard work and > > generous contributions to the project. We look forward to his continued > > involvement in the project. > > > > Some highlights of Oleg's contributions include identifying and > mitigating > > some challenging framework bugs, amqp support in 0.5.x, and some pretty > > exciting spring support slated for 0.6.x. As I have noticed, many of you > > have already had the opportunity to interact with Oleg on the mailing > > lists, the jiras, and pull request comments. I, for one, look forward to > > the continued community support! > > > > Welcome and congratulations! > > > > Tony >
Re: [ANNOUNCE] New Apache NiFi Committer Oleg Zhurakousky
Congratulations! Well deserved. > On Mar 15, 2016, at 8:17 PM, Tony Kurcwrote: > > On behalf of the Apache NiFI PMC, I am very pleased to announce that Oleg > Zhurakousky has accepted the PMC's invitation to become a committer on the > Apache NiFi project. We greatly appreciate all of Oleg's hard work and > generous contributions to the project. We look forward to his continued > involvement in the project. > > Some highlights of Oleg's contributions include identifying and mitigating > some challenging framework bugs, amqp support in 0.5.x, and some pretty > exciting spring support slated for 0.6.x. As I have noticed, many of you > have already had the opportunity to interact with Oleg on the mailing > lists, the jiras, and pull request comments. I, for one, look forward to > the continued community support! > > Welcome and congratulations! > > Tony
[ANNOUNCE] New Apache NiFi Committer Oleg Zhurakousky
On behalf of the Apache NiFI PMC, I am very pleased to announce that Oleg Zhurakousky has accepted the PMC's invitation to become a committer on the Apache NiFi project. We greatly appreciate all of Oleg's hard work and generous contributions to the project. We look forward to his continued involvement in the project. Some highlights of Oleg's contributions include identifying and mitigating some challenging framework bugs, amqp support in 0.5.x, and some pretty exciting spring support slated for 0.6.x. As I have noticed, many of you have already had the opportunity to interact with Oleg on the mailing lists, the jiras, and pull request comments. I, for one, look forward to the continued community support! Welcome and congratulations! Tony
Re: Cross NAR Controller Services
Devin, This WIki page shows how to create the appropriate dependencies between your NAR and the ControllerService: https://cwiki.apache.org/confluence/display/NIFI/Maven+Projects+for+Extensions#MavenProjectsforExtensions-LinkingProcessorsandControllerServices I also created an example project on GitHub to show a working example: https://github.com/bbende/nifi-dependency-example Hope that helps. -Bryan On Tue, Mar 15, 2016 at 7:33 PM, Oleg Zhurakousky < ozhurakou...@hortonworks.com> wrote: > Devin > > Your problem is most likely in your NAR poms where you may satisfy compile > dependency but not NAR to participate in class loader runtime inheritance. > Is there a way to look at your poms and also the general structure of the > project? > > Oleg > > Sent from my iPhone > > > On Mar 15, 2016, at 18:51, Devin Fisher < > devin.fis...@perfectsearchcorp.com> wrote: > > > > I'm having issues using a standard controller service > (DBCPConnectionPool) > > that is provided by nifi-dbcp-service-nar. But I'm having issues with my > > nar. I have included a dependency on nifi-dbcp-service-api in my maven > pom > > and have used the property description that is the same as ExecuteSQL for > > the DBCP_SERVICE property. When I load my processor in nifi I don't get a > > list of DBCPConnectionPool controller service like I expect. I have an > > ExecuteSQL processor in the same flow (for testing) and it list the > > controller service I created just fine and uses it just fine. > > > > The problem seems to me (I don't have a development environment to > confirm) > > that the DBCPService.class that I use in my processor is not seen as the > > same class object (because of the isolation features of NAR) as the one > > that DBCPCOnnectionPool implements. I think I have mostly confirmed this > by > > implementing a dummy controller service that implements DBCPService in > the > > same NAR as my processor and my processor is able to list it just fine. > But > > the ExecuteSQL don't list my dummy controller service. So they seem to be > > considered different classes. > > > > I think I'm doing something wrong because ExecuteSQL is not in the same > nar > > as DBCPConnectionPool. So they play nice together somehow but I don't see > > what I need to do so that my nar works the same way. > > > > I'm enjoying developing against nifi and sorry if this is a rookie > mistake. > > > > Devin >
Re: Cross NAR Controller Services
Devin Your problem is most likely in your NAR poms where you may satisfy compile dependency but not NAR to participate in class loader runtime inheritance. Is there a way to look at your poms and also the general structure of the project? Oleg Sent from my iPhone > On Mar 15, 2016, at 18:51, Devin Fisher> wrote: > > I'm having issues using a standard controller service (DBCPConnectionPool) > that is provided by nifi-dbcp-service-nar. But I'm having issues with my > nar. I have included a dependency on nifi-dbcp-service-api in my maven pom > and have used the property description that is the same as ExecuteSQL for > the DBCP_SERVICE property. When I load my processor in nifi I don't get a > list of DBCPConnectionPool controller service like I expect. I have an > ExecuteSQL processor in the same flow (for testing) and it list the > controller service I created just fine and uses it just fine. > > The problem seems to me (I don't have a development environment to confirm) > that the DBCPService.class that I use in my processor is not seen as the > same class object (because of the isolation features of NAR) as the one > that DBCPCOnnectionPool implements. I think I have mostly confirmed this by > implementing a dummy controller service that implements DBCPService in the > same NAR as my processor and my processor is able to list it just fine. But > the ExecuteSQL don't list my dummy controller service. So they seem to be > considered different classes. > > I think I'm doing something wrong because ExecuteSQL is not in the same nar > as DBCPConnectionPool. So they play nice together somehow but I don't see > what I need to do so that my nar works the same way. > > I'm enjoying developing against nifi and sorry if this is a rookie mistake. > > Devin
Re: Rollbacks
Thanks for your reply. I'm sorry if my question seems confusing. I'm still learning how nifi works. I don't have any understand about how the framework works on the back end and incomplete understanding of the exposed interface. From my point view (an external process developer) asking to rollback the one flow file that failed (I don't want changes made to it incompletely) and lets the other n flowfiles move on seems reasonable. But I don't know what is happening in the session on the back end. I likely don't really understand what happens on a rollback. Reading the developer's guide I got the impression that rollback disregards all changes made the session include transfers. It then returns the flowfiles to the queue. It would seem that a session is really finished and not usable after a rollback. So, I then don't understand how I can do my use case. I want to rollback (undo changes to a single flow file that failed) and then transfer it to the Failed relationship unchanged (or add the discard.reason to the attributes). I assume you mean "Run duration" when you refer to the 'scheduling' tab. I would love to understand better how that works. In the documentation, I only see a note about it in the User guide. But the developer's guide is silent. I don't see how that slider is enforced in the processor code. It seems that once the framework has ceded control to the processor it can run for as long as it wants. So more information about this would be great. Thanks again for the response. The information is always useful and enlighting. Devin On Tue, Mar 15, 2016 at 4:26 PM, Andrew Grandewrote: > Devin, > > What you're asking for is a contradicting requirement. One trades > individual message transactional control (and necessary overhead) for the > higher throughput with micro-batching (but lesser control). In short, you > can't expect to rollback a message and not affect the whole batch. > > However, if you 'commit' this batch as received by your processor, and > take on the responsibility of storing, tracking and commit/rollback of > those yourself for downstream connection But then, why? > > In general, one should leverage NiFi 'Scheduling' tab and have the > micro-batching aspect controlled via the framework. Unless you really > really have a very good reason to do it yourself. > > Hope this helps, > Andrew > > > > > On 3/7/16, 5:00 PM, "Devin Fisher" > wrote: > > >Question about rollbacks. I have a processor that is grabbing a list of > >FlowFiles from session.get(100). It will then process each flow file one > at > >a time. I want to then be able if there is an error with a single > FlowFile > >to roll it back (and only this failed FlowFile) and transfer it to the > >FAILED relationship. But reading the javadoc for ProcessSession I don't > get > >the sense that I can do that. > > > >Is my workflow wrong, should I only get one at a time from the session and > >commit after each one? > > > >Devin >
Re: Cross NAR Controller Services
Devin, DBCPConnectionPool is an implementation of the DBCPService interface. The DBCPService interface is in the nifi-dbcp-service-api and is the way you'd interact with the instance(s), which currently happen to be DBCPConnectionPool objects. If you implement your own class that implements DBCPService (_not_ DBCPConnectionPool), you will need to declare it for the ServiceLoader, namely as a file in your project called src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService containing a line with the fully-qualified name of your implementation. You can follow the DBCPConnectionPool example, but note that you would not refer to DBCPConnectionPool at all in your NAR. Hopefully this hasn't made things more confusing, please let me know if you have more questions. Regards, Matt On Tue, Mar 15, 2016 at 6:51 PM, Devin Fisher < devin.fis...@perfectsearchcorp.com> wrote: > I'm having issues using a standard controller service (DBCPConnectionPool) > that is provided by nifi-dbcp-service-nar. But I'm having issues with my > nar. I have included a dependency on nifi-dbcp-service-api in my maven pom > and have used the property description that is the same as ExecuteSQL for > the DBCP_SERVICE property. When I load my processor in nifi I don't get a > list of DBCPConnectionPool controller service like I expect. I have an > ExecuteSQL processor in the same flow (for testing) and it list the > controller service I created just fine and uses it just fine. > > The problem seems to me (I don't have a development environment to confirm) > that the DBCPService.class that I use in my processor is not seen as the > same class object (because of the isolation features of NAR) as the one > that DBCPCOnnectionPool implements. I think I have mostly confirmed this by > implementing a dummy controller service that implements DBCPService in the > same NAR as my processor and my processor is able to list it just fine. But > the ExecuteSQL don't list my dummy controller service. So they seem to be > considered different classes. > > I think I'm doing something wrong because ExecuteSQL is not in the same nar > as DBCPConnectionPool. So they play nice together somehow but I don't see > what I need to do so that my nar works the same way. > > I'm enjoying developing against nifi and sorry if this is a rookie mistake. > > Devin >
Cross NAR Controller Services
I'm having issues using a standard controller service (DBCPConnectionPool) that is provided by nifi-dbcp-service-nar. But I'm having issues with my nar. I have included a dependency on nifi-dbcp-service-api in my maven pom and have used the property description that is the same as ExecuteSQL for the DBCP_SERVICE property. When I load my processor in nifi I don't get a list of DBCPConnectionPool controller service like I expect. I have an ExecuteSQL processor in the same flow (for testing) and it list the controller service I created just fine and uses it just fine. The problem seems to me (I don't have a development environment to confirm) that the DBCPService.class that I use in my processor is not seen as the same class object (because of the isolation features of NAR) as the one that DBCPCOnnectionPool implements. I think I have mostly confirmed this by implementing a dummy controller service that implements DBCPService in the same NAR as my processor and my processor is able to list it just fine. But the ExecuteSQL don't list my dummy controller service. So they seem to be considered different classes. I think I'm doing something wrong because ExecuteSQL is not in the same nar as DBCPConnectionPool. So they play nice together somehow but I don't see what I need to do so that my nar works the same way. I'm enjoying developing against nifi and sorry if this is a rookie mistake. Devin
Re: Rollbacks
Devin, What you're asking for is a contradicting requirement. One trades individual message transactional control (and necessary overhead) for the higher throughput with micro-batching (but lesser control). In short, you can't expect to rollback a message and not affect the whole batch. However, if you 'commit' this batch as received by your processor, and take on the responsibility of storing, tracking and commit/rollback of those yourself for downstream connection But then, why? In general, one should leverage NiFi 'Scheduling' tab and have the micro-batching aspect controlled via the framework. Unless you really really have a very good reason to do it yourself. Hope this helps, Andrew On 3/7/16, 5:00 PM, "Devin Fisher"wrote: >Question about rollbacks. I have a processor that is grabbing a list of >FlowFiles from session.get(100). It will then process each flow file one at >a time. I want to then be able if there is an error with a single FlowFile >to roll it back (and only this failed FlowFile) and transfer it to the >FAILED relationship. But reading the javadoc for ProcessSession I don't get >the sense that I can do that. > >Is my workflow wrong, should I only get one at a time from the session and >commit after each one? > >Devin
[GitHub] nifi pull request: Nifi 1516 - AWS DynamoDB Get/Put/Delete Process...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/224#discussion_r56249462 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.dynamodb; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.Item; +import com.amazonaws.services.dynamodbv2.document.TableWriteItems; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; + +@SupportsBatching +@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert"}) +@CapabilityDescription("Puts a document from DynamoDB based on hash and range key. The table can have either hash and range or hash key alone." ++ " Currently the keys supported are string and number and value can be json document. " ++ "In case of hash and range keys both key are required for the operation." ++ " The FlowFile content must be JSON. FlowFile content is mapped to the specified Json Document attribute in the DynamoDB item.") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo db unprocessed keys"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "Dynamod db range key error"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo db key not found"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "Dynamo db exception message"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error code"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db error message"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error type"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db error service"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db error is retryable"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description =
[GitHub] nifi pull request: Nifi 1516 - AWS DynamoDB Get/Put/Delete Process...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/224#discussion_r56249382 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.dynamodb; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.Item; +import com.amazonaws.services.dynamodbv2.document.TableWriteItems; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; + +@SupportsBatching +@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert"}) +@CapabilityDescription("Puts a document from DynamoDB based on hash and range key. The table can have either hash and range or hash key alone." ++ " Currently the keys supported are string and number and value can be json document. " ++ "In case of hash and range keys both key are required for the operation." ++ " The FlowFile content must be JSON. FlowFile content is mapped to the specified Json Document attribute in the DynamoDB item.") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo db unprocessed keys"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "Dynamod db range key error"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo db key not found"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "Dynamo db exception message"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error code"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db error message"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error type"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db error service"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db error is retryable"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description =
[GitHub] nifi pull request: Nifi 1516 - AWS DynamoDB Get/Put/Delete Process...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/224#discussion_r56242141 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java --- @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.dynamodb; + +import java.math.BigDecimal; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; + +/** + * Base class for Nifi dynamo db related processors + * + * @see DeleteDynamoDB + * @see PutDynamoDB + * @see GetDynamoDB + */ +public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsProviderProcessor { + +public static final Relationship REL_UNPROCESSED = new Relationship.Builder().name("unprocessed") +.description("FlowFiles are routed to unprocessed relationship when DynamoDB is not able to process " + + "all the items in the request. Typical reasons are insufficient table throughput capacity and exceeding the maximum bytes per request. " + + "Unprocessed FlowFiles can be retried with a new request.").build(); + +public static final AllowableValue ALLOWABLE_VALUE_STRING = new AllowableValue("string"); +public static final AllowableValue ALLOWABLE_VALUE_NUMBER = new AllowableValue("number"); + +public static final String DYNAMODB_KEY_ERROR_UNPROCESSED = "dynamodb.key.error.unprocessed"; +public static final String DYNAMODB_RANGE_KEY_VALUE_ERROR = "dynmodb.range.key.value.error"; +public static final String DYNAMODB_HASH_KEY_VALUE_ERROR = "dynmodb.hash.key.value.error"; +public static final String DYNAMODB_KEY_ERROR_NOT_FOUND = "dynamodb.key.error.not.found"; +public static final String DYNAMODB_ERROR_EXCEPTION_MESSAGE = "dynamodb.error.exception.message"; +public static final String DYNAMODB_ERROR_CODE = "dynamodb.error.code"; +public static final String DYNAMODB_ERROR_MESSAGE = "dynamodb.error.message"; +public static final String DYNAMODB_ERROR_TYPE = "dynamodb.error.type"; +public static final String DYNAMODB_ERROR_SERVICE = "dynamodb.error.service"; +public static final String DYNAMODB_ERROR_RETRYABLE = "dynamodb.error.retryable"; +public static final String DYNAMODB_ERROR_REQUEST_ID = "dynamodb.error.request.id"; +public static final String DYNAMODB_ERROR_STATUS_CODE = "dynamodb.error.status.code"; +public static final String DYNAMODB_ITEM_HASH_KEY_VALUE = " dynamodb.item.hash.key.value"; +public static final String DYNAMODB_ITEM_RANGE_KEY_VALUE = " dynamodb.item.range.key.value"; +public static final String DYNAMODB_ITEM_IO_ERROR = "dynamodb.item.io.error"; + +protected static final String
[GitHub] nifi pull request: Nifi 1516 - AWS DynamoDB Get/Put/Delete Process...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/224#discussion_r56241934 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.dynamodb; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.TableWriteItems; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; + +@SupportsBatching +@SeeAlso({GetDynamoDB.class, PutDynamoDB.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"Amazon", "DynamoDB", "AWS", "Delete", "Remove"}) +@CapabilityDescription("Deletes a document from DynamoDB based on hash and range key. The key can be string or number." ++ " The request requires all the primary keys for the operation (hash or hash and range key)") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo db unprocessed keys"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "Dynamod db range key error"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo db key not found"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "Dynamo db exception message"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error code"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db error message"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error type"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db error service"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db error is retryable"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db error request id"), +@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db status code") +}) +@ReadsAttributes({ +@ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_HASH_KEY_VALUE, description = "Items hash key value" ), +@ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_RANGE_KEY_VALUE,
[GitHub] nifi pull request: NIFI-1047 Added additional method to get proces...
Github user trkurc commented on the pull request: https://github.com/apache/nifi/pull/278#issuecomment-197021816 This makes shutdown work in cywin, but not for the right reason. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1047 Added additional method to get proces...
Github user trkurc closed the pull request at: https://github.com/apache/nifi/pull/278 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56237562 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java --- @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Configuration; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.Row; +import com.google.common.collect.Sets; +import org.apache.nifi.authorization.exception.ProviderCreationException; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import javax.net.ssl.SSLContext; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +/** + * Unit tests for the AbstractCassandraProcessor class + */ +public class AbstractCassandraProcessorTest { + +MockAbstractCassandraProcessor processor; +private TestRunner testRunner; + +@Before +public void setUp() throws Exception { +processor = new MockAbstractCassandraProcessor(); +testRunner = TestRunners.newTestRunner(processor); +} + +@Test +public void testCustomValidate() throws Exception { --- End diff -- Since you already have a testCustomValidate, might as well add check for testing a port that isn't in the valid range. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56237303 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Configuration; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.UnavailableException; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import javax.net.ssl.SSLContext; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the PutCassandraQL processor + */ +public class PutCassandraQLTest { + +private TestRunner testRunner; +private MockPutCassandraQL processor; + +@Before +public void setUp() throws Exception { +processor = new MockPutCassandraQL(); +testRunner = TestRunners.newTestRunner(processor); +} + +@Test +public void testProcessorConfigValidity() { +testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "localhost:9042"); +testRunner.assertValid(); +testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "password"); +testRunner.assertNotValid(); +testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "username"); + testRunner.setProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL, "ONE"); +testRunner.assertValid(); +} + +@Test +public void testProcessorHappyPath() { +setUpStandardTestConfig(); + +testRunner.run(1, true, true); + testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_SUCCESS, 1); +testRunner.clearTransferState(); +} + +@Test +public void testProcessorInvalidQueryException() { +setUpStandardTestConfig(); + +// Test exceptions +processor.setExceptionToThrow( +new InvalidQueryException(new InetSocketAddress("localhost", 9042), "invalid query")); +testRunner.enqueue("UPDATE users SET cities = [ 'New York', 'Los Angeles' ] WHERE user_id = 'coast2coast';"); +testRunner.run(1, true, true); + testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_FAILURE, 1); +testRunner.clearTransferState(); +} + +@Test +public void testProcessorUnavailableException() { +setUpStandardTestConfig(); + +processor.setExceptionToThrow( +new UnavailableException(new InetSocketAddress("localhost", 9042), ConsistencyLevel.ALL, 5, 2));
[GitHub] nifi pull request: NIFI-1575: Add QueryDatabaseTable processor
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/261#discussion_r56235010 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java --- @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.util.LongHolder; +import org.apache.nifi.util.StopWatch; + +import java.io.IOException; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.text.DecimalFormat; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.sql.Types.ARRAY; +import static java.sql.Types.BIGINT; +import static java.sql.Types.BINARY; +import static java.sql.Types.BIT; +import static java.sql.Types.BLOB; +import static java.sql.Types.BOOLEAN; +import static java.sql.Types.CHAR; +import static java.sql.Types.CLOB; +import static java.sql.Types.DATE; +import static java.sql.Types.DECIMAL; +import static java.sql.Types.DOUBLE; +import static java.sql.Types.FLOAT; +import static java.sql.Types.INTEGER; +import static java.sql.Types.LONGNVARCHAR; +import static java.sql.Types.LONGVARBINARY; +import static java.sql.Types.LONGVARCHAR; +import static java.sql.Types.NCHAR; +import static java.sql.Types.NUMERIC; +import static java.sql.Types.NVARCHAR; +import static java.sql.Types.REAL; +import static java.sql.Types.ROWID; +import static java.sql.Types.SMALLINT; +import static java.sql.Types.TIME; +import static java.sql.Types.TIMESTAMP; +import static java.sql.Types.TINYINT; +import static java.sql.Types.VARBINARY; +import static java.sql.Types.VARCHAR; + +@EventDriven +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "select", "jdbc", "query", "database"}) +@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." ++ " Streaming is used so arbitrarily large result sets are supported. This
[GitHub] nifi pull request: NIFI-1575: Add QueryDatabaseTable processor
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/261#discussion_r56234935 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java --- @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.util.LongHolder; +import org.apache.nifi.util.StopWatch; + +import java.io.IOException; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.text.DecimalFormat; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.sql.Types.ARRAY; +import static java.sql.Types.BIGINT; +import static java.sql.Types.BINARY; +import static java.sql.Types.BIT; +import static java.sql.Types.BLOB; +import static java.sql.Types.BOOLEAN; +import static java.sql.Types.CHAR; +import static java.sql.Types.CLOB; +import static java.sql.Types.DATE; +import static java.sql.Types.DECIMAL; +import static java.sql.Types.DOUBLE; +import static java.sql.Types.FLOAT; +import static java.sql.Types.INTEGER; +import static java.sql.Types.LONGNVARCHAR; +import static java.sql.Types.LONGVARBINARY; +import static java.sql.Types.LONGVARCHAR; +import static java.sql.Types.NCHAR; +import static java.sql.Types.NUMERIC; +import static java.sql.Types.NVARCHAR; +import static java.sql.Types.REAL; +import static java.sql.Types.ROWID; +import static java.sql.Types.SMALLINT; +import static java.sql.Types.TIME; +import static java.sql.Types.TIMESTAMP; +import static java.sql.Types.TINYINT; +import static java.sql.Types.VARBINARY; +import static java.sql.Types.VARCHAR; + +@EventDriven +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "select", "jdbc", "query", "database"}) +@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." ++ " Streaming is used so arbitrarily large result sets are supported. This
[GitHub] nifi pull request: NIFI-1118 Update SplitText Processor - add supp...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/280#discussion_r56234292 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -134,6 +168,28 @@ protected void init(final ProcessorInitializationContext context) { } @Override +protected Collection customValidate(ValidationContext validationContext) { +ArrayList results = new ArrayList<>(); + +results.add(new ValidationResult.Builder() +.subject("Remove Trailing Newlines") + .valid(!validationContext.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean()) +.explanation("Property is deprecated; value must be set to false. Future releases may remove this Property.") +.build()); + +final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 +&& validationContext.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B) == null); --- End diff -- I think this line should be: && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet() ); --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1118 Update SplitText Processor - add supp...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/280#discussion_r56234376 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -134,6 +168,28 @@ protected void init(final ProcessorInitializationContext context) { } @Override +protected Collection customValidate(ValidationContext validationContext) { +ArrayList results = new ArrayList<>(); --- End diff -- Is there any reason we are declaring the type here as ArrayList, rather than List? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1118 Update SplitText Processor - add supp...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/280#discussion_r56233876 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -63,9 +71,16 @@ @SupportsBatching @Tags({"split", "text"}) @InputRequirement(Requirement.INPUT_REQUIRED) -@CapabilityDescription("Splits a text file into multiple smaller text files on line boundaries, each having up to a configured number of lines") +//@CapabilityDescription("Splits a text file into multiple smaller text files on line boundaries, each having up to a configured number of lines") --- End diff -- The commented-out line should probably be removed :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1575: Add QueryDatabaseTable processor
Github user markap14 commented on the pull request: https://github.com/apache/nifi/pull/261#issuecomment-197004580 Matt: left a few comments inline. Mostly regarding error handling. Otherwise, looks good! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1575: Add QueryDatabaseTable processor
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/261#discussion_r56232920 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java --- @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.util.LongHolder; +import org.apache.nifi.util.StopWatch; + +import java.io.IOException; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.text.DecimalFormat; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.sql.Types.ARRAY; +import static java.sql.Types.BIGINT; +import static java.sql.Types.BINARY; +import static java.sql.Types.BIT; +import static java.sql.Types.BLOB; +import static java.sql.Types.BOOLEAN; +import static java.sql.Types.CHAR; +import static java.sql.Types.CLOB; +import static java.sql.Types.DATE; +import static java.sql.Types.DECIMAL; +import static java.sql.Types.DOUBLE; +import static java.sql.Types.FLOAT; +import static java.sql.Types.INTEGER; +import static java.sql.Types.LONGNVARCHAR; +import static java.sql.Types.LONGVARBINARY; +import static java.sql.Types.LONGVARCHAR; +import static java.sql.Types.NCHAR; +import static java.sql.Types.NUMERIC; +import static java.sql.Types.NVARCHAR; +import static java.sql.Types.REAL; +import static java.sql.Types.ROWID; +import static java.sql.Types.SMALLINT; +import static java.sql.Types.TIME; +import static java.sql.Types.TIMESTAMP; +import static java.sql.Types.TINYINT; +import static java.sql.Types.VARBINARY; +import static java.sql.Types.VARCHAR; + +@EventDriven +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "select", "jdbc", "query", "database"}) +@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." ++ " Streaming is used so arbitrarily large result sets are supported. This processor
[GitHub] nifi pull request: NIFI-1575: Add QueryDatabaseTable processor
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/261#discussion_r56232701 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java --- @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.util.LongHolder; +import org.apache.nifi.util.StopWatch; + +import java.io.IOException; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.text.DecimalFormat; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.sql.Types.ARRAY; +import static java.sql.Types.BIGINT; +import static java.sql.Types.BINARY; +import static java.sql.Types.BIT; +import static java.sql.Types.BLOB; +import static java.sql.Types.BOOLEAN; +import static java.sql.Types.CHAR; +import static java.sql.Types.CLOB; +import static java.sql.Types.DATE; +import static java.sql.Types.DECIMAL; +import static java.sql.Types.DOUBLE; +import static java.sql.Types.FLOAT; +import static java.sql.Types.INTEGER; +import static java.sql.Types.LONGNVARCHAR; +import static java.sql.Types.LONGVARBINARY; +import static java.sql.Types.LONGVARCHAR; +import static java.sql.Types.NCHAR; +import static java.sql.Types.NUMERIC; +import static java.sql.Types.NVARCHAR; +import static java.sql.Types.REAL; +import static java.sql.Types.ROWID; +import static java.sql.Types.SMALLINT; +import static java.sql.Types.TIME; +import static java.sql.Types.TIMESTAMP; +import static java.sql.Types.TINYINT; +import static java.sql.Types.VARBINARY; +import static java.sql.Types.VARCHAR; + +@EventDriven +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "select", "jdbc", "query", "database"}) +@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." ++ " Streaming is used so arbitrarily large result sets are supported. This processor
[GitHub] nifi pull request: NIFI-614 Added initial support for new style JM...
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/222 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1575: Add QueryDatabaseTable processor
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/261#discussion_r56232384 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java --- @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.util.LongHolder; +import org.apache.nifi.util.StopWatch; + +import java.io.IOException; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.text.DecimalFormat; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.sql.Types.ARRAY; +import static java.sql.Types.BIGINT; +import static java.sql.Types.BINARY; +import static java.sql.Types.BIT; +import static java.sql.Types.BLOB; +import static java.sql.Types.BOOLEAN; +import static java.sql.Types.CHAR; +import static java.sql.Types.CLOB; +import static java.sql.Types.DATE; +import static java.sql.Types.DECIMAL; +import static java.sql.Types.DOUBLE; +import static java.sql.Types.FLOAT; +import static java.sql.Types.INTEGER; +import static java.sql.Types.LONGNVARCHAR; +import static java.sql.Types.LONGVARBINARY; +import static java.sql.Types.LONGVARCHAR; +import static java.sql.Types.NCHAR; +import static java.sql.Types.NUMERIC; +import static java.sql.Types.NVARCHAR; +import static java.sql.Types.REAL; +import static java.sql.Types.ROWID; +import static java.sql.Types.SMALLINT; +import static java.sql.Types.TIME; +import static java.sql.Types.TIMESTAMP; +import static java.sql.Types.TINYINT; +import static java.sql.Types.VARBINARY; +import static java.sql.Types.VARCHAR; + +@EventDriven +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "select", "jdbc", "query", "database"}) +@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." ++ " Streaming is used so arbitrarily large result sets are supported. This processor
[GitHub] nifi pull request: NIFI-1575: Add QueryDatabaseTable processor
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/261#discussion_r56232288 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java --- @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.util.LongHolder; +import org.apache.nifi.util.StopWatch; + +import java.io.IOException; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.text.DecimalFormat; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.sql.Types.ARRAY; +import static java.sql.Types.BIGINT; +import static java.sql.Types.BINARY; +import static java.sql.Types.BIT; +import static java.sql.Types.BLOB; +import static java.sql.Types.BOOLEAN; +import static java.sql.Types.CHAR; +import static java.sql.Types.CLOB; +import static java.sql.Types.DATE; +import static java.sql.Types.DECIMAL; +import static java.sql.Types.DOUBLE; +import static java.sql.Types.FLOAT; +import static java.sql.Types.INTEGER; +import static java.sql.Types.LONGNVARCHAR; +import static java.sql.Types.LONGVARBINARY; +import static java.sql.Types.LONGVARCHAR; +import static java.sql.Types.NCHAR; +import static java.sql.Types.NUMERIC; +import static java.sql.Types.NVARCHAR; +import static java.sql.Types.REAL; +import static java.sql.Types.ROWID; +import static java.sql.Types.SMALLINT; +import static java.sql.Types.TIME; +import static java.sql.Types.TIMESTAMP; +import static java.sql.Types.TINYINT; +import static java.sql.Types.VARBINARY; +import static java.sql.Types.VARCHAR; + +@EventDriven +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "select", "jdbc", "query", "database"}) +@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." ++ " Streaming is used so arbitrarily large result sets are supported. This processor
[GitHub] nifi pull request: NIFI-1575: Add QueryDatabaseTable processor
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/261#discussion_r56231653 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java --- @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.util.LongHolder; +import org.apache.nifi.util.StopWatch; + +import java.io.IOException; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.text.DecimalFormat; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.sql.Types.ARRAY; +import static java.sql.Types.BIGINT; +import static java.sql.Types.BINARY; +import static java.sql.Types.BIT; +import static java.sql.Types.BLOB; +import static java.sql.Types.BOOLEAN; +import static java.sql.Types.CHAR; +import static java.sql.Types.CLOB; +import static java.sql.Types.DATE; +import static java.sql.Types.DECIMAL; +import static java.sql.Types.DOUBLE; +import static java.sql.Types.FLOAT; +import static java.sql.Types.INTEGER; +import static java.sql.Types.LONGNVARCHAR; +import static java.sql.Types.LONGVARBINARY; +import static java.sql.Types.LONGVARCHAR; +import static java.sql.Types.NCHAR; +import static java.sql.Types.NUMERIC; +import static java.sql.Types.NVARCHAR; +import static java.sql.Types.REAL; +import static java.sql.Types.ROWID; +import static java.sql.Types.SMALLINT; +import static java.sql.Types.TIME; +import static java.sql.Types.TIMESTAMP; +import static java.sql.Types.TINYINT; +import static java.sql.Types.VARBINARY; +import static java.sql.Types.VARCHAR; + +@EventDriven +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "select", "jdbc", "query", "database"}) +@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." ++ " Streaming is used so arbitrarily large result sets are supported. This processor
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-197000303 Looking good going to AWS endpoints and services. Good stuff. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1488 Refactoring HBase Kerberos support
GitHub user bbende opened a pull request: https://github.com/apache/nifi/pull/281 NIFI-1488 Refactoring HBase Kerberos support - Storing UGI so we can support multiple HBaseClientServices with different configs - Creating nifi-hadoop-utils to hold utility code shared between HDFS and HBase processors - Incorporating KerberosProperties into existing hadoop processors You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbende/nifi NIFI-1488 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/281.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #281 commit 42766156c67542dc02d9ea9d8418c7f86bd7 Author: Bryan BendeDate: 2016-03-15T18:58:03Z NIFI-1488 Refactoring HBase Kerberos support - Storing UGI so we can support multiple HBaseClientServices with different configs - Creating nifi-hadoop-utils to hold utility code shared between HDFS and HBase processors - Incorporating KerberosProperties into existing hadoop processors --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56216735 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java --- @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.InvalidTypeException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.QueryExecutionException; +import com.datastax.driver.core.exceptions.QueryValidationException; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@SupportsBatching +@Tags({"cassandra", "cql", "put", "insert", "update", "set"}) +@EventDriven +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Execute provided Cassandra Query Language (CQL) statement. The content of an incoming FlowFile " ++ "is expected to be the CQL command to execute. The CQL command may use the ? to escape parameters. In this " ++ "case, the parameters to use must exist as FlowFile attributes with the naming convention cql.args.N.type " ++ "and cql.args.N.value, where N is a positive integer. The cql.args.N.type is expected to be " ++ "a lowercase string indicating the Cassandra type.") +@ReadsAttributes({ +@ReadsAttribute(attribute = "cql.args.N.type", +description = "Incoming FlowFiles are expected to be parameterized CQL statements. The type of each " ++ "parameter is specified as a lowercase string corresponding to the Cassandra data type (text, " ++ "int, boolean, e.g.). In the case of collections, the primitive type(s) of the elements in the " ++ "collection should be comma-delimited, follow the collection type, and be enclosed in
[GitHub] nifi pull request: NIFI-1047 Added additional method to get proces...
Github user trkurc commented on a diff in the pull request: https://github.com/apache/nifi/pull/278#discussion_r56214991 --- Diff: nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java --- @@ -1131,6 +1135,22 @@ public void run() { this.loggingFutures = futures; } +private Long getPidViaRuntimeMXBean(final Logger logger) { --- End diff -- Not a bad idea - I think the same goes for the method that calls this. This is kind of attempt #2 for getting a pid. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56214671 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java --- @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.SSLOptions; +import com.datastax.driver.core.Session; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.exception.ProviderCreationException; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.security.util.SslContextFactory; +import org.apache.nifi.ssl.SSLContextService; + +import javax.net.ssl.SSLContext; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +/** + * AbstractCassandraProcessor is a base class for Cassandra processors and contains logic and variables common to most + * processors integrating with Apache Cassandra. + */ +public abstract class AbstractCassandraProcessor extends AbstractProcessor { + +public static final int DEFAULT_CASSANDRA_PORT = 9042; + +private static final Validator HOSTNAME_PORT_VALIDATOR = new Validator() { +@Override +public ValidationResult validate(final String subject, final String input, final ValidationContext context) { +final List esList = Arrays.asList(input.split(",")); +for (String hostnamePort : esList) { +String[] addresses = hostnamePort.split(":"); +// Protect against invalid input like http://127.0.0.1:9042 (URL scheme should not be there) +if (addresses.length != 2) { +return new ValidationResult.Builder().subject(subject).input(input).explanation( +"Each entry must be in hostname:port form (no scheme such as http://, and port must be specified)") --- End diff -- I may have commented on this before. Wouldn't it be easier to simply have two fields host/port and avid having this validator all together? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1047 Added additional method to get proces...
Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/278#discussion_r56213996 --- Diff: nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java --- @@ -1131,6 +1135,22 @@ public void run() { this.loggingFutures = futures; } +private Long getPidViaRuntimeMXBean(final Logger logger) { --- End diff -- Perhaps document that this code is still subject to JVM compatibility? In other words may not work on all JVMs although I can confirm that on most JVMs I've used in the past this approach does work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-196956296 @mans2singh Overall, the changes look good and appreciate the inclusion of some additional tests. Going to do some functional testing but think we can get this merged in. There are, unfortunately, a lot of conflicts, so for some of the minor points mentioned in the review, would you also be able to please get this rebased so we are good to go for a merge? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r56212999 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult; +import com.amazonaws.services.kinesisfirehose.model.Record; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"}) +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. " ++ "In order to send data to firehose, the firehose delivery stream name has to be specified.") +@WritesAttributes({ +@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")}) +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { + +/** + * Kinesis put record response error message + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message"; + +/** + * Kinesis put record response error code + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code"; + +/** + * Kinesis put record response record id + */ +public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id"; + +public static final List properties = Collections.unmodifiableList( +Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, + PROXY_HOST,PROXY_HOST_PORT)); + +/** + * Max buffer size 1000kb + */ +public static final int MAX_MESSAGE_SIZE = 1000 * 1024; + +@Override +protected List getSupportedPropertyDescriptors() { +return properties; +} + +@Override +public void onTrigger(final ProcessContext context, final ProcessSession session) { +FlowFile flowFileCandidate = session.get(); +if ( flowFileCandidate == null ) +return; + +
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r56212702 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult; +import com.amazonaws.services.kinesisfirehose.model.Record; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"}) +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. " ++ "In order to send data to firehose, the firehose delivery stream name has to be specified.") +@WritesAttributes({ +@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")}) +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { + +/** + * Kinesis put record response error message + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message"; + +/** + * Kinesis put record response error code + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code"; + +/** + * Kinesis put record response record id + */ +public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id"; + +public static final List properties = Collections.unmodifiableList( +Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, + PROXY_HOST,PROXY_HOST_PORT)); + +/** + * Max buffer size 1000kb + */ +public static final int MAX_MESSAGE_SIZE = 1000 * 1024; + +@Override +protected List getSupportedPropertyDescriptors() { +return properties; +} + +@Override +public void onTrigger(final ProcessContext context, final ProcessSession session) { +FlowFile flowFileCandidate = session.get(); +if ( flowFileCandidate == null ) +return; + +
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r56212500 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult; +import com.amazonaws.services.kinesisfirehose.model.Record; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"}) +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. " ++ "In order to send data to firehose, the firehose delivery stream name has to be specified.") +@WritesAttributes({ +@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")}) +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { + +/** + * Kinesis put record response error message + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message"; + +/** + * Kinesis put record response error code + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code"; + +/** + * Kinesis put record response record id + */ +public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id"; + +public static final List properties = Collections.unmodifiableList( +Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, + PROXY_HOST,PROXY_HOST_PORT)); + +/** + * Max buffer size 1000kb --- End diff -- Now that we have a buffer, should likely call this message size --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r56212428 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; + +/** + * This class provides processor the base class for kinesis firehose + */ +public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor { + +public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder() +.name("Amazon Kinesis Firehose Delivery Stream Name") +.description("The name of kinesis firehose delivery stream") +.expressionLanguageSupported(false) +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("Batch Size") +.description("Batch size for messages (1-500).") +.defaultValue("250") +.required(false) +.addValidator(StandardValidators.createLongValidator(1, 500, true)) +.sensitive(false) +.build(); + +public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new PropertyDescriptor.Builder() +.name("Max message buffer size (MB)") +.description("Max message buffer size (1-50) MB") +.defaultValue("1") +.required(false) +.addValidator(StandardValidators.createLongValidator(1, 50, true)) --- End diff -- Likely want to prefer utilization of the DATA_SIZE_VALIDATOR. Not sure we need to strictly cap this as we are guiding users toward an appropriate path and if they have conditions where they wish to go above the maximum (as well as supporting hardware) that should be fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: Unable to Copy data from local NIFI to Cluster in AWS
I am attaching log info: 2016-03-15 12:39:24,699 ERROR [Timer-Driven Process Thread-3] o.a.nifi.remote.StandardRemoteGroupPort RemoteGroupPort[name=FromRemote,target=http://.compute-1.amazonaws.com:1/nifi] failed to communicate with http://xxx.compute-1.amazonaws.com:1/nifi due to java.net.ConnectException: Operation timed out 2016-03-15 12:39:24,700 INFO [Timer-Driven Process Thread-10] o.a.n.r.c.socket.EndpointConnectionPool New Weighted Distribution of Nodes: Node[10.xxx.200.yyy:0] will receive 100.0% of data 2016-03-15 12:39:55,920 INFO [NiFi Site-to-Site Connection Pool Maintenance] o.a.n.r.c.socket.EndpointConnectionPool EndpointConnectionPool[Cluster URL=http://.compute-1.amazonaws.com:1/nifi] Successfully refreshed Peer Status; remote instance consists of 1 peers -- View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Unable-Copy-data-from-local-NIFI-to-Cluster-in-AWS-tp8099p8102.html Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.
[GitHub] nifi pull request: NIFI-1464 fixed OnScheduled invocation to pass ...
Github user olegz closed the pull request at: https://github.com/apache/nifi/pull/279 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r56201575 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult; +import com.amazonaws.services.kinesisfirehose.model.Record; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"}) +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. " ++ "In order to send data to firehose, the firehose delivery stream name has to be specified.") +@WritesAttributes({ +@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")}) +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { + +/** + * Kinesis put record response error message + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message"; + +/** + * Kinesis put record response error code + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code"; + +/** + * Kinesis put record response record id + */ +public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id"; + +public static final List properties = Collections.unmodifiableList( +Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, + PROXY_HOST,PROXY_HOST_PORT)); + +/** + * Max buffer size 1000kb + */ +public static final int MAX_MESSAGE_SIZE = 1000 * 1024; + +@Override +protected List getSupportedPropertyDescriptors() { +return properties; +} + +@Override +public void onTrigger(final ProcessContext context, final ProcessSession session) { +FlowFile flowFileCandidate = session.get(); +if ( flowFileCandidate == null ) +return; + +
Unable Copy data from local NIFI to Cluster in AWS
Hi I am running NIFI in my MAC and NIFI cluster in AWS. RPG is able to see port and nodes in AWS cluster. However, when I try to send data from local NIFI to AWS cluster, I getting below error. Any help appreciated. RemoteGroupPort[name=FromRemote,target=http://.compute-1.amazonaws.com:1/nifi] failed to communicate with http://.compute-1.amazonaws.com:1/nifi due to java.net.ConnectException: Operation timed out -- View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Unable-Copy-data-from-local-NIFI-to-Cluster-in-AWS-tp8099.html Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.
Process Creation
To whom it may concern, I am trying to create a process that will export data from a MySQL database using a select query. This data will in turn be used to either update or insert records of another MySQL database. At the moment my process in NiFi includes the flowing Process flow: ExecuteSQL - running the select query ConvertAvroToJSON - to clean up the output file which according to the document can be used to create a SQL update or insert query PutFile - used for testing to view output as I'm new to NiFi The other process is: GetFile - obtains the output of the first process ConvertJSONToSQL - using this to take the JSON and make the SQL Insert query PutSQL - Should take the query and run it entering the data into the DB The problem I'm having is once the JSON file hits the ConvertJSONToSQL an error is return about the other table's ID field not being found. This ID filed is auto incrementing and doesn't (or shouldn't in my mind) need to be provided. Could you provide any assistance and/or design advice to get this process up and running correctly would be great. Thanks, Josh
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56191239 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Configuration; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.UnavailableException; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import javax.net.ssl.SSLContext; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the PutCassandraQL processor + */ +public class PutCassandraQLTest { + +private TestRunner testRunner; +private MockPutCassandraQL processor; + +@Before +public void setUp() throws Exception { +processor = new MockPutCassandraQL(); +testRunner = TestRunners.newTestRunner(processor); +} + +@Test +public void testProcessor() { +testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "localhost:9042"); +testRunner.assertValid(); +testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "password"); +testRunner.assertNotValid(); +testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "username"); + testRunner.setProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL, "ONE"); +testRunner.assertValid(); + +testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties) VALUES ?, ?, ?, ?", +new HashMap() { +{ +put("cql.args.1.type", "int"); +put("cql.args.1.value", "1"); +put("cql.args.2.type", "text"); +put("cql.args.2.value", "Joe"); +put("cql.args.3.type", "text"); +// No value for arg 3 to test setNull +put("cql.args.4.type", "map "); +put("cql.args.4.value", "{'a':'Hello', 'b':'World'}"); +put("cql.args.5.type", "list"); +put("cql.args.5.value", "[true,false,true]"); +put("cql.args.6.type", "set"); +put("cql.args.6.value", "{1.0, 2.0}"); +put("cql.args.7.type", "bigint"); +put("cql.args.7.value", "2000"); +put("cql.args.8.type", "float"); +put("cql.args.8.value", "1.0");
Re: Closing in on the Apache NiFi 0.6.0 release
Sounds great. Will scope those out in the process. Thanks, Tony. On Tue, Mar 15, 2016 at 11:37 AM, Tony Kurcwrote: > Aldrin, > I did some crappy shell scripts to help, I'll try to put those up on the > wiki. > > Tony > > On Tue, Mar 15, 2016 at 11:29 AM, Aldrin Piri > wrote: > > > In an effort to avoid me missing helpers and spamming the list, I will > > throw my hat into the ring to do this one. > > > > On Tue, Mar 15, 2016 at 11:28 AM, Joe Witt wrote: > > > > > Team, > > > > > > Seeing good progress today. Great! > > > > > > I know I volunteered to do the RM for the release but now I need to be > > > out of town and will have questionable Internet access. Can someone > > > else grab it? > > > > > > Thanks > > > Joe > > > > > > On Mon, Mar 14, 2016 at 9:01 PM, Joe Witt wrote: > > > > Team, > > > > > > > > Things are closing in well but we need to get pretty specific on > these > > > > to keep the release moving roughly along the lines of the schedule > > > > we've discussed previously. Those of you who have tickets on here > > > > that can be moved to 0.7.0 please do so. Otherwise, let's please > keep > > > > discussion/status information up so we can all see how we're trending > > > > toward release candidate readiness. > > > > > > > > The current items on the release based on JIRA fix version that > remain > > > are: > > > > > > > > - Supporting a new JMS producer/consumer style [1] [2] [3] > > > > [status]Great rounds of review with Oleg, Percivall, Moser. Looks > > close. > > > > > > > > - Processors to interact with Apache Cassandra [4] > > > > [status]Looks ready but needing review. > > > > > > > > - Additional functionality/cleanup for SplitText [5] > > > > [status]Still in discussions. Recommend we move this change to > 0.7.0. > > > > > > > > - Support Kerberos based authentication to REST API [6] > > > > [status]Update suggests it is almost ready. A suggested key feature > > > > of this release. > > > > > > > > - Add Kerberos support to HBase processors [7] > > > > [status]Appears nearly ready. Great discussions and group contribs. > > > > > > > > - Add support for Spring Context loaded processors (Spring > > > > Integrations, Camel, ...) [8] > > > > [status]Appears ready. Needs review. > > > > > > > > - Support based database snapshot/query/change capture [9] > > > > [status] Appears ready. Needs review. > > > > > > > > - Allow empty content posting w/o content type header [10] > > > > [status] Discussions led to changing behavior a bit. Possibly slide > > > > to 0.7.0 as an alternative approach exists for now. > > > > > > > > - Support Kinesis Firehose interaction [11] > > > > [status] Appears to be a candidate to slide to 0.7.0 > > > > > > > > - Support Apache Spark 1.6 for Spark receiver [12] > > > > [status] Appears ready but needing review. > > > > > > > > - Support SSL with AMQP processors [13] > > > > [status] Appears ready but needs review as it adds to a commonly used > > > > SSL interface. > > > > > > > > [1] https://issues.apache.org/jira/browse/NIFI-614 > > > > [2] https://issues.apache.org/jira/browse/NIFI-1424 > > > > [3] https://issues.apache.org/jira/browse/NIFI-1425 > > > > [4] https://issues.apache.org/jira/browse/NIFI-901 > > > > [5] https://issues.apache.org/jira/browse/NIFI-1118 > > > > [6] https://issues.apache.org/jira/browse/NIFI-1274 > > > > [7] https://issues.apache.org/jira/browse/NIFI-1488 > > > > [8] https://issues.apache.org/jira/browse/NIFI-1571 > > > > [9] https://issues.apache.org/jira/browse/NIFI-1575 > > > > [10] https://issues.apache.org/jira/browse/NIFI-1620 > > > > [11] https://issues.apache.org/jira/browse/NIFI-1495 > > > > [12] https://issues.apache.org/jira/browse/NIFI-1519 > > > > [13] https://issues.apache.org/jira/browse/NIFI-1521 > > > > > > > > On Thu, Mar 10, 2016 at 2:10 PM, Joe Witt > wrote: > > > >> Have updated the migration guide [1] and release notes [2] for > 0.6.0. > > > >> If there are features/inclusions will adjust as needed. I intend to > > > >> follow a very similar RM model to what Tony did for the 0.5.x line > and > > > >> which follows the apparent consensus from our recent Git > discussions. > > > >> > > > >> We'll definitely need folks to really focus in on the existing JIRAs > > > >> slated for 0.6.0 and any necessary reviews or tweaks. Early next > week > > > >> we should start moving tickets to the next minor (0.7.0) build that > > > >> aren't key bug fixes or aren't tied to previously stated 0.6.0 > goals. > > > >> > > > >> [1] > > https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance > > > >> [2] https://cwiki.apache.org/confluence/display/NIFI/Release+Notes > > > >> > > > >> Thanks > > > >> Joe > > > >> > > > >> On Wed, Mar 9, 2016 at 6:08 PM, Tony Kurc wrote: > > > >>> Joe, > > > >>> I tagged this one that I've been closing in on, and was just > > finishing > > > up. > >
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56187278 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java --- @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.InvalidTypeException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.QueryExecutionException; +import com.datastax.driver.core.exceptions.QueryValidationException; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@SupportsBatching +@Tags({"cassandra", "cql", "put", "insert", "update", "set"}) +@EventDriven +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Execute provided Cassandra Query Language (CQL) statement. The content of an incoming FlowFile " ++ "is expected to be the CQL command to execute. The CQL command may use the ? to escape parameters. In this " ++ "case, the parameters to use must exist as FlowFile attributes with the naming convention cql.args.N.type " ++ "and cql.args.N.value, where N is a positive integer. The cql.args.N.type is expected to be " ++ "a lowercase string indicating the Cassandra type. The content of the FlowFile is expected to be in UTF-8 format.") +@ReadsAttributes({ +@ReadsAttribute(attribute = "cql.args.N.type", +description = "Incoming FlowFiles are expected to be parameterized CQL statements. The type of each " ++ "parameter is specified as a lowercase string corresponding to the Cassandra data type (text, " ++ "int, boolean, e.g.). In the case of collections, the primitive type(s) of the elements in the " ++ "collection should be
[GitHub] nifi pull request: NIFI-1118 Update SplitText Processor - add supp...
GitHub user jskora opened a pull request: https://github.com/apache/nifi/pull/280 NIFI-1118 Update SplitText Processor - add support for split size limits and header line markers. * Add "Maximum Fragment Size" property. A new split file will be created if the next line to be added to the current split file exceeds this user-defined maximum file size. In the case where an input line is longer than the fragment size, this line will be output in a separate split file that will exceed the maximum fragment size. * Add "Header Line Marker Character" property. Lines that begin with these user-defined character(s) will be considered header line(s) rather than a predetermined number of lines. The existing property "Header Line Count" must be zero for this new property and behavior to be used. * Deprecated the "Remove Trailing Newlines" property. * Fixed conditional that incorrectly suppressed splits where the content line count equaled the header line count and did not remove empty splits from the session. * Minor formatting cleanup. * Exclude test files from RAT check in pom.xml. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jskora/nifi NIFI-1118 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/280.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #280 commit 914c5a2c52b19d153077a89b59f751aa49ddf86c Author: Joe SkoraDate: 2016-03-15T14:31:00Z NIFI-1118 Update SplitText Processor - add support for split size limits and header line markers. * Add "Maximum Fragment Size" property. A new split file will be created if the next line to be added to the current split file exceeds this user-defined maximum file size. In the case where an input line is longer than the fragment size, this line will be output in a separate split file that will exceed the maximum fragment size. * Add "Header Line Marker Character" property. Lines that begin with these user-defined character(s) will be considered header line(s) rather than a predetermined number of lines. The existing property "Header Line Count" must be zero for this new property and behavior to be used. * Fixed conditional that incorrectly suppressed splits where the content line count equaled the header line count and did not remove empty splits from the session. * Minor formatting cleanup. * Exclude test files from RAT check in pom.xml. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: Closing in on the Apache NiFi 0.6.0 release
Aldrin, I did some crappy shell scripts to help, I'll try to put those up on the wiki. Tony On Tue, Mar 15, 2016 at 11:29 AM, Aldrin Piriwrote: > In an effort to avoid me missing helpers and spamming the list, I will > throw my hat into the ring to do this one. > > On Tue, Mar 15, 2016 at 11:28 AM, Joe Witt wrote: > > > Team, > > > > Seeing good progress today. Great! > > > > I know I volunteered to do the RM for the release but now I need to be > > out of town and will have questionable Internet access. Can someone > > else grab it? > > > > Thanks > > Joe > > > > On Mon, Mar 14, 2016 at 9:01 PM, Joe Witt wrote: > > > Team, > > > > > > Things are closing in well but we need to get pretty specific on these > > > to keep the release moving roughly along the lines of the schedule > > > we've discussed previously. Those of you who have tickets on here > > > that can be moved to 0.7.0 please do so. Otherwise, let's please keep > > > discussion/status information up so we can all see how we're trending > > > toward release candidate readiness. > > > > > > The current items on the release based on JIRA fix version that remain > > are: > > > > > > - Supporting a new JMS producer/consumer style [1] [2] [3] > > > [status]Great rounds of review with Oleg, Percivall, Moser. Looks > close. > > > > > > - Processors to interact with Apache Cassandra [4] > > > [status]Looks ready but needing review. > > > > > > - Additional functionality/cleanup for SplitText [5] > > > [status]Still in discussions. Recommend we move this change to 0.7.0. > > > > > > - Support Kerberos based authentication to REST API [6] > > > [status]Update suggests it is almost ready. A suggested key feature > > > of this release. > > > > > > - Add Kerberos support to HBase processors [7] > > > [status]Appears nearly ready. Great discussions and group contribs. > > > > > > - Add support for Spring Context loaded processors (Spring > > > Integrations, Camel, ...) [8] > > > [status]Appears ready. Needs review. > > > > > > - Support based database snapshot/query/change capture [9] > > > [status] Appears ready. Needs review. > > > > > > - Allow empty content posting w/o content type header [10] > > > [status] Discussions led to changing behavior a bit. Possibly slide > > > to 0.7.0 as an alternative approach exists for now. > > > > > > - Support Kinesis Firehose interaction [11] > > > [status] Appears to be a candidate to slide to 0.7.0 > > > > > > - Support Apache Spark 1.6 for Spark receiver [12] > > > [status] Appears ready but needing review. > > > > > > - Support SSL with AMQP processors [13] > > > [status] Appears ready but needs review as it adds to a commonly used > > > SSL interface. > > > > > > [1] https://issues.apache.org/jira/browse/NIFI-614 > > > [2] https://issues.apache.org/jira/browse/NIFI-1424 > > > [3] https://issues.apache.org/jira/browse/NIFI-1425 > > > [4] https://issues.apache.org/jira/browse/NIFI-901 > > > [5] https://issues.apache.org/jira/browse/NIFI-1118 > > > [6] https://issues.apache.org/jira/browse/NIFI-1274 > > > [7] https://issues.apache.org/jira/browse/NIFI-1488 > > > [8] https://issues.apache.org/jira/browse/NIFI-1571 > > > [9] https://issues.apache.org/jira/browse/NIFI-1575 > > > [10] https://issues.apache.org/jira/browse/NIFI-1620 > > > [11] https://issues.apache.org/jira/browse/NIFI-1495 > > > [12] https://issues.apache.org/jira/browse/NIFI-1519 > > > [13] https://issues.apache.org/jira/browse/NIFI-1521 > > > > > > On Thu, Mar 10, 2016 at 2:10 PM, Joe Witt wrote: > > >> Have updated the migration guide [1] and release notes [2] for 0.6.0. > > >> If there are features/inclusions will adjust as needed. I intend to > > >> follow a very similar RM model to what Tony did for the 0.5.x line and > > >> which follows the apparent consensus from our recent Git discussions. > > >> > > >> We'll definitely need folks to really focus in on the existing JIRAs > > >> slated for 0.6.0 and any necessary reviews or tweaks. Early next week > > >> we should start moving tickets to the next minor (0.7.0) build that > > >> aren't key bug fixes or aren't tied to previously stated 0.6.0 goals. > > >> > > >> [1] > https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance > > >> [2] https://cwiki.apache.org/confluence/display/NIFI/Release+Notes > > >> > > >> Thanks > > >> Joe > > >> > > >> On Wed, Mar 9, 2016 at 6:08 PM, Tony Kurc wrote: > > >>> Joe, > > >>> I tagged this one that I've been closing in on, and was just > finishing > > up. > > >>> > > >>> https://issues.apache.org/jira/browse/NIFI-1481 > > >>> > > >>> On Wed, Mar 9, 2016 at 5:42 PM, Joe Witt wrote: > > >>> > > Team, > > > > It is time to start pulling in for the Apache NiFi 0.6.0 release to > > keep with our previously suggested cadence. There are already a lot > > of really nice improvements/bug fixes on
Re: Closing in on the Apache NiFi 0.6.0 release
In an effort to avoid me missing helpers and spamming the list, I will throw my hat into the ring to do this one. On Tue, Mar 15, 2016 at 11:28 AM, Joe Wittwrote: > Team, > > Seeing good progress today. Great! > > I know I volunteered to do the RM for the release but now I need to be > out of town and will have questionable Internet access. Can someone > else grab it? > > Thanks > Joe > > On Mon, Mar 14, 2016 at 9:01 PM, Joe Witt wrote: > > Team, > > > > Things are closing in well but we need to get pretty specific on these > > to keep the release moving roughly along the lines of the schedule > > we've discussed previously. Those of you who have tickets on here > > that can be moved to 0.7.0 please do so. Otherwise, let's please keep > > discussion/status information up so we can all see how we're trending > > toward release candidate readiness. > > > > The current items on the release based on JIRA fix version that remain > are: > > > > - Supporting a new JMS producer/consumer style [1] [2] [3] > > [status]Great rounds of review with Oleg, Percivall, Moser. Looks close. > > > > - Processors to interact with Apache Cassandra [4] > > [status]Looks ready but needing review. > > > > - Additional functionality/cleanup for SplitText [5] > > [status]Still in discussions. Recommend we move this change to 0.7.0. > > > > - Support Kerberos based authentication to REST API [6] > > [status]Update suggests it is almost ready. A suggested key feature > > of this release. > > > > - Add Kerberos support to HBase processors [7] > > [status]Appears nearly ready. Great discussions and group contribs. > > > > - Add support for Spring Context loaded processors (Spring > > Integrations, Camel, ...) [8] > > [status]Appears ready. Needs review. > > > > - Support based database snapshot/query/change capture [9] > > [status] Appears ready. Needs review. > > > > - Allow empty content posting w/o content type header [10] > > [status] Discussions led to changing behavior a bit. Possibly slide > > to 0.7.0 as an alternative approach exists for now. > > > > - Support Kinesis Firehose interaction [11] > > [status] Appears to be a candidate to slide to 0.7.0 > > > > - Support Apache Spark 1.6 for Spark receiver [12] > > [status] Appears ready but needing review. > > > > - Support SSL with AMQP processors [13] > > [status] Appears ready but needs review as it adds to a commonly used > > SSL interface. > > > > [1] https://issues.apache.org/jira/browse/NIFI-614 > > [2] https://issues.apache.org/jira/browse/NIFI-1424 > > [3] https://issues.apache.org/jira/browse/NIFI-1425 > > [4] https://issues.apache.org/jira/browse/NIFI-901 > > [5] https://issues.apache.org/jira/browse/NIFI-1118 > > [6] https://issues.apache.org/jira/browse/NIFI-1274 > > [7] https://issues.apache.org/jira/browse/NIFI-1488 > > [8] https://issues.apache.org/jira/browse/NIFI-1571 > > [9] https://issues.apache.org/jira/browse/NIFI-1575 > > [10] https://issues.apache.org/jira/browse/NIFI-1620 > > [11] https://issues.apache.org/jira/browse/NIFI-1495 > > [12] https://issues.apache.org/jira/browse/NIFI-1519 > > [13] https://issues.apache.org/jira/browse/NIFI-1521 > > > > On Thu, Mar 10, 2016 at 2:10 PM, Joe Witt wrote: > >> Have updated the migration guide [1] and release notes [2] for 0.6.0. > >> If there are features/inclusions will adjust as needed. I intend to > >> follow a very similar RM model to what Tony did for the 0.5.x line and > >> which follows the apparent consensus from our recent Git discussions. > >> > >> We'll definitely need folks to really focus in on the existing JIRAs > >> slated for 0.6.0 and any necessary reviews or tweaks. Early next week > >> we should start moving tickets to the next minor (0.7.0) build that > >> aren't key bug fixes or aren't tied to previously stated 0.6.0 goals. > >> > >> [1] https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance > >> [2] https://cwiki.apache.org/confluence/display/NIFI/Release+Notes > >> > >> Thanks > >> Joe > >> > >> On Wed, Mar 9, 2016 at 6:08 PM, Tony Kurc wrote: > >>> Joe, > >>> I tagged this one that I've been closing in on, and was just finishing > up. > >>> > >>> https://issues.apache.org/jira/browse/NIFI-1481 > >>> > >>> On Wed, Mar 9, 2016 at 5:42 PM, Joe Witt wrote: > >>> > Team, > > It is time to start pulling in for the Apache NiFi 0.6.0 release to > keep with our previously suggested cadence. There are already a lot > of really nice improvements/bug fixes on there and some nice new > features. We do have about 23 outstanding JIRAs assigned that are > open. > > Most appear to be in great shape with active discussion centered > around PRs with review feedback. Some just appear to need the final > push over the merge line. There are also some PRs on Github that may > well be in a ready state. > > If
Re: Closing in on the Apache NiFi 0.6.0 release
Team, Seeing good progress today. Great! I know I volunteered to do the RM for the release but now I need to be out of town and will have questionable Internet access. Can someone else grab it? Thanks Joe On Mon, Mar 14, 2016 at 9:01 PM, Joe Wittwrote: > Team, > > Things are closing in well but we need to get pretty specific on these > to keep the release moving roughly along the lines of the schedule > we've discussed previously. Those of you who have tickets on here > that can be moved to 0.7.0 please do so. Otherwise, let's please keep > discussion/status information up so we can all see how we're trending > toward release candidate readiness. > > The current items on the release based on JIRA fix version that remain are: > > - Supporting a new JMS producer/consumer style [1] [2] [3] > [status]Great rounds of review with Oleg, Percivall, Moser. Looks close. > > - Processors to interact with Apache Cassandra [4] > [status]Looks ready but needing review. > > - Additional functionality/cleanup for SplitText [5] > [status]Still in discussions. Recommend we move this change to 0.7.0. > > - Support Kerberos based authentication to REST API [6] > [status]Update suggests it is almost ready. A suggested key feature > of this release. > > - Add Kerberos support to HBase processors [7] > [status]Appears nearly ready. Great discussions and group contribs. > > - Add support for Spring Context loaded processors (Spring > Integrations, Camel, ...) [8] > [status]Appears ready. Needs review. > > - Support based database snapshot/query/change capture [9] > [status] Appears ready. Needs review. > > - Allow empty content posting w/o content type header [10] > [status] Discussions led to changing behavior a bit. Possibly slide > to 0.7.0 as an alternative approach exists for now. > > - Support Kinesis Firehose interaction [11] > [status] Appears to be a candidate to slide to 0.7.0 > > - Support Apache Spark 1.6 for Spark receiver [12] > [status] Appears ready but needing review. > > - Support SSL with AMQP processors [13] > [status] Appears ready but needs review as it adds to a commonly used > SSL interface. > > [1] https://issues.apache.org/jira/browse/NIFI-614 > [2] https://issues.apache.org/jira/browse/NIFI-1424 > [3] https://issues.apache.org/jira/browse/NIFI-1425 > [4] https://issues.apache.org/jira/browse/NIFI-901 > [5] https://issues.apache.org/jira/browse/NIFI-1118 > [6] https://issues.apache.org/jira/browse/NIFI-1274 > [7] https://issues.apache.org/jira/browse/NIFI-1488 > [8] https://issues.apache.org/jira/browse/NIFI-1571 > [9] https://issues.apache.org/jira/browse/NIFI-1575 > [10] https://issues.apache.org/jira/browse/NIFI-1620 > [11] https://issues.apache.org/jira/browse/NIFI-1495 > [12] https://issues.apache.org/jira/browse/NIFI-1519 > [13] https://issues.apache.org/jira/browse/NIFI-1521 > > On Thu, Mar 10, 2016 at 2:10 PM, Joe Witt wrote: >> Have updated the migration guide [1] and release notes [2] for 0.6.0. >> If there are features/inclusions will adjust as needed. I intend to >> follow a very similar RM model to what Tony did for the 0.5.x line and >> which follows the apparent consensus from our recent Git discussions. >> >> We'll definitely need folks to really focus in on the existing JIRAs >> slated for 0.6.0 and any necessary reviews or tweaks. Early next week >> we should start moving tickets to the next minor (0.7.0) build that >> aren't key bug fixes or aren't tied to previously stated 0.6.0 goals. >> >> [1] https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance >> [2] https://cwiki.apache.org/confluence/display/NIFI/Release+Notes >> >> Thanks >> Joe >> >> On Wed, Mar 9, 2016 at 6:08 PM, Tony Kurc wrote: >>> Joe, >>> I tagged this one that I've been closing in on, and was just finishing up. >>> >>> https://issues.apache.org/jira/browse/NIFI-1481 >>> >>> On Wed, Mar 9, 2016 at 5:42 PM, Joe Witt wrote: >>> Team, It is time to start pulling in for the Apache NiFi 0.6.0 release to keep with our previously suggested cadence. There are already a lot of really nice improvements/bug fixes on there and some nice new features. We do have about 23 outstanding JIRAs assigned that are open. Most appear to be in great shape with active discussion centered around PRs with review feedback. Some just appear to need the final push over the merge line. There are also some PRs on Github that may well be in a ready state. If there are things which are really important to folks that they do not presently see on the 0.6.0 list but that they really want/need and think are ready or close to ready please advise. I am happy to RM the release but if someone else is interested please advise. Let's try to shoot for Mar 16th vote start so we can be close to the Mar 18th goal talked about a while ago on the 6-12
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56178725 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java --- @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.InvalidTypeException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.QueryExecutionException; +import com.datastax.driver.core.exceptions.QueryValidationException; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@SupportsBatching +@Tags({"cassandra", "cql", "put", "insert", "update", "set"}) +@EventDriven +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Execute provided Cassandra Query Language (CQL) statement. The content of an incoming FlowFile " ++ "is expected to be the CQL command to execute. The CQL command may use the ? to escape parameters. In this " ++ "case, the parameters to use must exist as FlowFile attributes with the naming convention cql.args.N.type " ++ "and cql.args.N.value, where N is a positive integer. The cql.args.N.type is expected to be " ++ "a lowercase string indicating the Cassandra type. The content of the FlowFile is expected to be in UTF-8 format.") +@ReadsAttributes({ +@ReadsAttribute(attribute = "cql.args.N.type", +description = "Incoming FlowFiles are expected to be parameterized CQL statements. The type of each " ++ "parameter is specified as a lowercase string corresponding to the Cassandra data type (text, " ++ "int, boolean, e.g.). In the case of collections, the primitive type(s) of the elements in the " ++ "collection should be
[GitHub] nifi pull request: NIFI-614 Added initial support for new style JM...
Github user olegz commented on the pull request: https://github.com/apache/nifi/pull/222#issuecomment-196861594 Agreed and the JIRA is https://issues.apache.org/jira/browse/NIFI-1628 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-614 Added initial support for new style JM...
Github user joewitt commented on the pull request: https://github.com/apache/nifi/pull/222#issuecomment-196858986 i am on the same page of this being a good start oleg provided the JIRA is created to enable ssl context to provide info for providers that want it. We don't have support for tibco ssl in apache now anyway. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56176199 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java --- @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.InvalidTypeException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.QueryExecutionException; +import com.datastax.driver.core.exceptions.QueryValidationException; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@SupportsBatching +@Tags({"cassandra", "cql", "put", "insert", "update", "set"}) +@EventDriven +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Execute provided Cassandra Query Language (CQL) statement. The content of an incoming FlowFile " ++ "is expected to be the CQL command to execute. The CQL command may use the ? to escape parameters. In this " ++ "case, the parameters to use must exist as FlowFile attributes with the naming convention cql.args.N.type " ++ "and cql.args.N.value, where N is a positive integer. The cql.args.N.type is expected to be " ++ "a lowercase string indicating the Cassandra type. The content of the FlowFile is expected to be in UTF-8 format.") --- End diff -- Doh! Good catch, will update --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-614 Added initial support for new style JM...
Github user olegz commented on the pull request: https://github.com/apache/nifi/pull/222#issuecomment-196855425 So the problem with Tibco and SSL is that Tibco does things differently then other providers. In other words they don't use SSLContext and instead rely on setting individual properties inside their implementation of ConnectionFactory. The good thing is we can extract them from SSLContext, but need ti know exactly what we need. That said, propose to release this new JMS support with understanding that it does not yet support Tibco SSL and add it later as part of the enhancement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56175269 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java --- @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.InvalidTypeException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.QueryExecutionException; +import com.datastax.driver.core.exceptions.QueryValidationException; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@SupportsBatching +@Tags({"cassandra", "cql", "put", "insert", "update", "set"}) +@EventDriven +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Execute provided Cassandra Query Language (CQL) statement. The content of an incoming FlowFile " ++ "is expected to be the CQL command to execute. The CQL command may use the ? to escape parameters. In this " ++ "case, the parameters to use must exist as FlowFile attributes with the naming convention cql.args.N.type " ++ "and cql.args.N.value, where N is a positive integer. The cql.args.N.type is expected to be " ++ "a lowercase string indicating the Cassandra type. The content of the FlowFile is expected to be in UTF-8 format.") --- End diff -- "The content of the FlowFile is expected to be in UTF-8 format." After adding the charset property I don't believe this is the case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1118 Update SplitText Processor - add supp...
Github user jskora commented on the pull request: https://github.com/apache/nifi/pull/135#issuecomment-196847105 Cancelling this pull request, will followup with new PR for updated and current source branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1118 Update SplitText Processor - add supp...
Github user jskora closed the pull request at: https://github.com/apache/nifi/pull/135 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1519 upgraded to the latest version of Spa...
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/231 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1519 upgraded to the latest version of Spa...
Github user mattyb149 commented on the pull request: https://github.com/apache/nifi/pull/231#issuecomment-196831085 +1 LGTM, tested the receiver on Spark 1.6.0, was able to retrieve and process data coming from a NiFi Output Port on a different machine --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1464 fixed OnScheduled invocation to pass ...
GitHub user olegz opened a pull request: https://github.com/apache/nifi/pull/279 NIFI-1464 fixed OnScheduled invocation to pass ProcessContext You can merge this pull request into a Git repository by running: $ git pull https://github.com/olegz/nifi NIFI-1464C Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/279.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #279 commit 05bd552941dd496ec219d4ce724591766b6c4351 Author: Oleg ZhurakouskyDate: 2016-03-15T13:48:29Z NIFI-1464 fixed OnScheduled invocation to pass ProcessContext --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1464 life-cycle refactoring part-2
Github user olegz closed the pull request at: https://github.com/apache/nifi/pull/275 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---