[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16619029#comment-16619029 ] ASF GitHub Bot commented on NIFI-5510: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2992 > Allow records to be put directly into Cassandra > --- > > Key: NIFI-5510 > URL: https://issues.apache.org/jira/browse/NIFI-5510 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Matt Burgess >Assignee: Sivaprasanna Sethuraman >Priority: Major > > Currently the standard way of getting data into Cassandra through NiFi is to > use PutCassandraQL, which often means raw data needs to be converted to CQL > statements, usually done (with modifications) via ConvertJSONToSQL. > It would be better to have something closer to PutDatabaseRecord, a processor > called PutCassandraRecord perhaps, that would take the raw data and input it > into Cassandra "directly", without the need for the user to convert the data > into CQL statements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16619027#comment-16619027 ] ASF GitHub Bot commented on NIFI-5510: -- Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2992 Merged. Thanks for the contribution. > Allow records to be put directly into Cassandra > --- > > Key: NIFI-5510 > URL: https://issues.apache.org/jira/browse/NIFI-5510 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Matt Burgess >Assignee: Sivaprasanna Sethuraman >Priority: Major > > Currently the standard way of getting data into Cassandra through NiFi is to > use PutCassandraQL, which often means raw data needs to be converted to CQL > statements, usually done (with modifications) via ConvertJSONToSQL. > It would be better to have something closer to PutDatabaseRecord, a processor > called PutCassandraRecord perhaps, that would take the raw data and input it > into Cassandra "directly", without the need for the user to convert the data > into CQL statements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16619025#comment-16619025 ] ASF subversion and git services commented on NIFI-5510: --- Commit c56a7e9ba5808ead129d462c369e73c710dd3145 in nifi's branch refs/heads/master from zenfenan [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=c56a7e9 ] NIFI-5510: Introducing PutCassandraRecord processor NIFI-5510: Fixes for PR review comments This closes #2992 Signed-off-by: Mike Thomsen > Allow records to be put directly into Cassandra > --- > > Key: NIFI-5510 > URL: https://issues.apache.org/jira/browse/NIFI-5510 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Matt Burgess >Assignee: Sivaprasanna Sethuraman >Priority: Major > > Currently the standard way of getting data into Cassandra through NiFi is to > use PutCassandraQL, which often means raw data needs to be converted to CQL > statements, usually done (with modifications) via ConvertJSONToSQL. > It would be better to have something closer to PutDatabaseRecord, a processor > called PutCassandraRecord perhaps, that would take the raw data and input it > into Cassandra "directly", without the need for the user to convert the data > into CQL statements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16619024#comment-16619024 ] ASF subversion and git services commented on NIFI-5510: --- Commit c56a7e9ba5808ead129d462c369e73c710dd3145 in nifi's branch refs/heads/master from zenfenan [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=c56a7e9 ] NIFI-5510: Introducing PutCassandraRecord processor NIFI-5510: Fixes for PR review comments This closes #2992 Signed-off-by: Mike Thomsen > Allow records to be put directly into Cassandra > --- > > Key: NIFI-5510 > URL: https://issues.apache.org/jira/browse/NIFI-5510 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Matt Burgess >Assignee: Sivaprasanna Sethuraman >Priority: Major > > Currently the standard way of getting data into Cassandra through NiFi is to > use PutCassandraQL, which often means raw data needs to be converted to CQL > statements, usually done (with modifications) via ConvertJSONToSQL. > It would be better to have something closer to PutDatabaseRecord, a processor > called PutCassandraRecord perhaps, that would take the raw data and input it > into Cassandra "directly", without the need for the user to convert the data > into CQL statements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618063#comment-16618063 ] ASF GitHub Bot commented on NIFI-5510: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2992#discussion_r218199589 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java --- @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StopWatch; + +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("This is a record aware processor that reads the content of the incoming FlowFile as individual records using the " + +"configured 'Record Reader' and writes them to Apache Cassandra using native protocol version 3 or higher.") +public class PutCassandraRecord extends AbstractCassandraProcessor { + +static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() +.name("put-cassandra-record-reader") +.displayName("Record Reader") +.description("Specifies the type of Record Reader controller service to use for parsing the incoming data " + +"and determining the schema") +.identifiesControllerService(RecordReaderFactory.class) +.required(true) +.build(); + +static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder() +.name("put-cassandra-record-table") +.displayName("Table name") +.description("The name of the Cassandra table to which the records have to be written.") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) +
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618064#comment-16618064 ] ASF GitHub Bot commented on NIFI-5510: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2992#discussion_r218200235 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java --- @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StopWatch; + +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("This is a record aware processor that reads the content of the incoming FlowFile as individual records using the " + +"configured 'Record Reader' and writes them to Apache Cassandra using native protocol version 3 or higher.") +public class PutCassandraRecord extends AbstractCassandraProcessor { + +static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() +.name("put-cassandra-record-reader") +.displayName("Record Reader") +.description("Specifies the type of Record Reader controller service to use for parsing the incoming data " + +"and determining the schema") +.identifiesControllerService(RecordReaderFactory.class) +.required(true) +.build(); + +static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder() +.name("put-cassandra-record-table") +.displayName("Table name") +.description("The name of the Cassandra table to which the records have to be written.") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) +
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618055#comment-16618055 ] ASF GitHub Bot commented on NIFI-5510: -- Github user zenfenan commented on the issue: https://github.com/apache/nifi/pull/2992 I think we can merge this one, if everything checks out good instead of waiting. > Allow records to be put directly into Cassandra > --- > > Key: NIFI-5510 > URL: https://issues.apache.org/jira/browse/NIFI-5510 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Matt Burgess >Assignee: Sivaprasanna Sethuraman >Priority: Major > > Currently the standard way of getting data into Cassandra through NiFi is to > use PutCassandraQL, which often means raw data needs to be converted to CQL > statements, usually done (with modifications) via ConvertJSONToSQL. > It would be better to have something closer to PutDatabaseRecord, a processor > called PutCassandraRecord perhaps, that would take the raw data and input it > into Cassandra "directly", without the need for the user to convert the data > into CQL statements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618031#comment-16618031 ] ASF GitHub Bot commented on NIFI-5510: -- Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2992 Sounds good. We can put this on hold while you do that client service PR. > Allow records to be put directly into Cassandra > --- > > Key: NIFI-5510 > URL: https://issues.apache.org/jira/browse/NIFI-5510 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Matt Burgess >Assignee: Sivaprasanna Sethuraman >Priority: Major > > Currently the standard way of getting data into Cassandra through NiFi is to > use PutCassandraQL, which often means raw data needs to be converted to CQL > statements, usually done (with modifications) via ConvertJSONToSQL. > It would be better to have something closer to PutDatabaseRecord, a processor > called PutCassandraRecord perhaps, that would take the raw data and input it > into Cassandra "directly", without the need for the user to convert the data > into CQL statements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617774#comment-16617774 ] ASF GitHub Bot commented on NIFI-5510: -- Github user zenfenan commented on the issue: https://github.com/apache/nifi/pull/2992 Yeah, I have actually started the implementation for a connection pool service. Wasn't able to open up a Jira. Got held up with some other things. I'll be doing it and will also wrap up the implementation and raise a PR. > Allow records to be put directly into Cassandra > --- > > Key: NIFI-5510 > URL: https://issues.apache.org/jira/browse/NIFI-5510 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Matt Burgess >Assignee: Sivaprasanna Sethuraman >Priority: Major > > Currently the standard way of getting data into Cassandra through NiFi is to > use PutCassandraQL, which often means raw data needs to be converted to CQL > statements, usually done (with modifications) via ConvertJSONToSQL. > It would be better to have something closer to PutDatabaseRecord, a processor > called PutCassandraRecord perhaps, that would take the raw data and input it > into Cassandra "directly", without the need for the user to convert the data > into CQL statements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617455#comment-16617455 ] ASF GitHub Bot commented on NIFI-5510: -- Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2992 NP. We should discuss making a controller service that handles the connection pooling for Cassandra. Doesn't have to go in here, but it would be one less thing to update if it were. > Allow records to be put directly into Cassandra > --- > > Key: NIFI-5510 > URL: https://issues.apache.org/jira/browse/NIFI-5510 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Matt Burgess >Assignee: Sivaprasanna Sethuraman >Priority: Major > > Currently the standard way of getting data into Cassandra through NiFi is to > use PutCassandraQL, which often means raw data needs to be converted to CQL > statements, usually done (with modifications) via ConvertJSONToSQL. > It would be better to have something closer to PutDatabaseRecord, a processor > called PutCassandraRecord perhaps, that would take the raw data and input it > into Cassandra "directly", without the need for the user to convert the data > into CQL statements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617076#comment-16617076 ] ASF GitHub Bot commented on NIFI-5510: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2992#discussion_r217948182 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java --- @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StopWatch; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Writes the content of the incoming FlowFile as individual records to Apache Cassandra using native protocol version 3 or higher.") +public class PutCassandraRecord extends AbstractCassandraProcessor { + +static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() +.name("put-cassandra-record-reader") +.displayName("Record Reader") +.description("Specifies the type of Record Reader controller service to use for parsing the incoming data " + +"and determining the schema") +.identifiesControllerService(RecordReaderFactory.class) +.required(true) +.build(); + +static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder() +.name("put-cassandra-record-table") +.displayName("Table name") +.description("The name of the Cassandra table to which the records have to be written.") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) +
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617074#comment-16617074 ] ASF GitHub Bot commented on NIFI-5510: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2992#discussion_r217948138 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java --- @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StopWatch; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Writes the content of the incoming FlowFile as individual records to Apache Cassandra using native protocol version 3 or higher.") +public class PutCassandraRecord extends AbstractCassandraProcessor { + +static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() +.name("put-cassandra-record-reader") +.displayName("Record Reader") +.description("Specifies the type of Record Reader controller service to use for parsing the incoming data " + +"and determining the schema") +.identifiesControllerService(RecordReaderFactory.class) +.required(true) +.build(); + +static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder() +.name("put-cassandra-record-table") +.displayName("Table name") +.description("The name of the Cassandra table to which the records have to be written.") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) +
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617072#comment-16617072 ] ASF GitHub Bot commented on NIFI-5510: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2992#discussion_r217947988 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java --- @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StopWatch; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Writes the content of the incoming FlowFile as individual records to Apache Cassandra using native protocol version 3 or higher.") --- End diff -- Understood. I'll update it. > Allow records to be put directly into Cassandra > --- > > Key: NIFI-5510 > URL: https://issues.apache.org/jira/browse/NIFI-5510 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Matt Burgess >Assignee: Sivaprasanna Sethuraman >Priority: Major > > Currently the standard way of getting data into Cassandra through NiFi is to > use PutCassandraQL, which often means raw data needs to be converted to CQL > statements, usually done (with modifications) via ConvertJSONToSQL. > It would be better to have something closer to PutDatabaseRecord, a processor > called PutCassandraRecord perhaps, that would take the raw data and input it > into Cassandra "directly", without the need for the user to convert the data > into CQL statements. -- This message was sent by
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609600#comment-16609600 ] ASF GitHub Bot commented on NIFI-5510: -- Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2992 [PutCassandraRecord_Test.xml.txt](https://github.com/apache/nifi/files/2367829/PutCassandraRecord_Test.xml.txt) Validated that it works with that simple flow. I was able to use that to insert a few records and see them with `cqlsh`. > Allow records to be put directly into Cassandra > --- > > Key: NIFI-5510 > URL: https://issues.apache.org/jira/browse/NIFI-5510 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Matt Burgess >Assignee: Sivaprasanna Sethuraman >Priority: Major > > Currently the standard way of getting data into Cassandra through NiFi is to > use PutCassandraQL, which often means raw data needs to be converted to CQL > statements, usually done (with modifications) via ConvertJSONToSQL. > It would be better to have something closer to PutDatabaseRecord, a processor > called PutCassandraRecord perhaps, that would take the raw data and input it > into Cassandra "directly", without the need for the user to convert the data > into CQL statements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609347#comment-16609347 ] ASF GitHub Bot commented on NIFI-5510: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2992#discussion_r216353534 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java --- @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StopWatch; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Writes the content of the incoming FlowFile as individual records to Apache Cassandra using native protocol version 3 or higher.") +public class PutCassandraRecord extends AbstractCassandraProcessor { + +static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() +.name("put-cassandra-record-reader") +.displayName("Record Reader") +.description("Specifies the type of Record Reader controller service to use for parsing the incoming data " + +"and determining the schema") +.identifiesControllerService(RecordReaderFactory.class) +.required(true) +.build(); + +static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder() +.name("put-cassandra-record-table") +.displayName("Table name") +.description("The name of the Cassandra table to which the records have to be written.") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) +
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609346#comment-16609346 ] ASF GitHub Bot commented on NIFI-5510: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2992#discussion_r216353686 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java --- @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StopWatch; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Writes the content of the incoming FlowFile as individual records to Apache Cassandra using native protocol version 3 or higher.") +public class PutCassandraRecord extends AbstractCassandraProcessor { + +static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() +.name("put-cassandra-record-reader") +.displayName("Record Reader") +.description("Specifies the type of Record Reader controller service to use for parsing the incoming data " + +"and determining the schema") +.identifiesControllerService(RecordReaderFactory.class) +.required(true) +.build(); + +static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder() +.name("put-cassandra-record-table") +.displayName("Table name") +.description("The name of the Cassandra table to which the records have to be written.") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) +
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609348#comment-16609348 ] ASF GitHub Bot commented on NIFI-5510: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2992#discussion_r216070172 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java --- @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StopWatch; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Writes the content of the incoming FlowFile as individual records to Apache Cassandra using native protocol version 3 or higher.") +public class PutCassandraRecord extends AbstractCassandraProcessor { + +static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() +.name("put-cassandra-record-reader") +.displayName("Record Reader") +.description("Specifies the type of Record Reader controller service to use for parsing the incoming data " + +"and determining the schema") +.identifiesControllerService(RecordReaderFactory.class) +.required(true) +.build(); + +static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder() +.name("put-cassandra-record-table") +.displayName("Table name") +.description("The name of the Cassandra table to which the records have to be written.") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) +
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609349#comment-16609349 ] ASF GitHub Bot commented on NIFI-5510: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2992#discussion_r216355406 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml --- @@ -64,5 +64,20 @@ nifi-ssl-context-service test + --- End diff -- No new dependencies from outside of our project, so L looks good. > Allow records to be put directly into Cassandra > --- > > Key: NIFI-5510 > URL: https://issues.apache.org/jira/browse/NIFI-5510 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Matt Burgess >Assignee: Sivaprasanna Sethuraman >Priority: Major > > Currently the standard way of getting data into Cassandra through NiFi is to > use PutCassandraQL, which often means raw data needs to be converted to CQL > statements, usually done (with modifications) via ConvertJSONToSQL. > It would be better to have something closer to PutDatabaseRecord, a processor > called PutCassandraRecord perhaps, that would take the raw data and input it > into Cassandra "directly", without the need for the user to convert the data > into CQL statements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609350#comment-16609350 ] ASF GitHub Bot commented on NIFI-5510: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2992#discussion_r216354635 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java --- @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StopWatch; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Writes the content of the incoming FlowFile as individual records to Apache Cassandra using native protocol version 3 or higher.") +public class PutCassandraRecord extends AbstractCassandraProcessor { + +static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() +.name("put-cassandra-record-reader") +.displayName("Record Reader") +.description("Specifies the type of Record Reader controller service to use for parsing the incoming data " + +"and determining the schema") +.identifiesControllerService(RecordReaderFactory.class) +.required(true) +.build(); + +static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder() +.name("put-cassandra-record-table") +.displayName("Table name") +.description("The name of the Cassandra table to which the records have to be written.") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) +
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609351#comment-16609351 ] ASF GitHub Bot commented on NIFI-5510: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2992#discussion_r216353050 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java --- @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StopWatch; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Writes the content of the incoming FlowFile as individual records to Apache Cassandra using native protocol version 3 or higher.") +public class PutCassandraRecord extends AbstractCassandraProcessor { + +static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() +.name("put-cassandra-record-reader") +.displayName("Record Reader") +.description("Specifies the type of Record Reader controller service to use for parsing the incoming data " + +"and determining the schema") +.identifiesControllerService(RecordReaderFactory.class) +.required(true) +.build(); + +static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder() +.name("put-cassandra-record-table") +.displayName("Table name") +.description("The name of the Cassandra table to which the records have to be written.") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) +
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609345#comment-16609345 ] ASF GitHub Bot commented on NIFI-5510: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2992#discussion_r216070922 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java --- @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StopWatch; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Writes the content of the incoming FlowFile as individual records to Apache Cassandra using native protocol version 3 or higher.") +public class PutCassandraRecord extends AbstractCassandraProcessor { + +static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() +.name("put-cassandra-record-reader") +.displayName("Record Reader") +.description("Specifies the type of Record Reader controller service to use for parsing the incoming data " + +"and determining the schema") +.identifiesControllerService(RecordReaderFactory.class) +.required(true) +.build(); + +static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder() +.name("put-cassandra-record-table") +.displayName("Table name") +.description("The name of the Cassandra table to which the records have to be written.") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) +
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609344#comment-16609344 ] ASF GitHub Bot commented on NIFI-5510: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2992#discussion_r216069857 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java --- @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StopWatch; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Writes the content of the incoming FlowFile as individual records to Apache Cassandra using native protocol version 3 or higher.") --- End diff -- It would be helpful to have more content here, particularly explicitly calling out that it is a record-aware processor. > Allow records to be put directly into Cassandra > --- > > Key: NIFI-5510 > URL: https://issues.apache.org/jira/browse/NIFI-5510 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Matt Burgess >Assignee: Sivaprasanna Sethuraman >Priority: Major > > Currently the standard way of getting data into Cassandra through NiFi is to > use PutCassandraQL, which often means raw data needs to be converted to CQL > statements, usually done (with modifications) via ConvertJSONToSQL. > It would be better to have something closer to PutDatabaseRecord, a processor > called PutCassandraRecord perhaps, that would take the raw data and input it > into Cassandra "directly", without the
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607366#comment-16607366 ] ASF GitHub Bot commented on NIFI-5510: -- Github user zenfenan commented on the issue: https://github.com/apache/nifi/pull/2992 @MikeThomsen No complex setup is required. A Cassandra cluster has to be up and running. That's all. > Allow records to be put directly into Cassandra > --- > > Key: NIFI-5510 > URL: https://issues.apache.org/jira/browse/NIFI-5510 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Matt Burgess >Assignee: Sivaprasanna Sethuraman >Priority: Major > > Currently the standard way of getting data into Cassandra through NiFi is to > use PutCassandraQL, which often means raw data needs to be converted to CQL > statements, usually done (with modifications) via ConvertJSONToSQL. > It would be better to have something closer to PutDatabaseRecord, a processor > called PutCassandraRecord perhaps, that would take the raw data and input it > into Cassandra "directly", without the need for the user to convert the data > into CQL statements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16604946#comment-16604946 ] ASF GitHub Bot commented on NIFI-5510: -- Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2992 Been meaning to kick the tires on Cassandra a bit. Would a vanilla Docker image do or is there more setup required to work this? > Allow records to be put directly into Cassandra > --- > > Key: NIFI-5510 > URL: https://issues.apache.org/jira/browse/NIFI-5510 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Matt Burgess >Assignee: Sivaprasanna Sethuraman >Priority: Major > > Currently the standard way of getting data into Cassandra through NiFi is to > use PutCassandraQL, which often means raw data needs to be converted to CQL > statements, usually done (with modifications) via ConvertJSONToSQL. > It would be better to have something closer to PutDatabaseRecord, a processor > called PutCassandraRecord perhaps, that would take the raw data and input it > into Cassandra "directly", without the need for the user to convert the data > into CQL statements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16604749#comment-16604749 ] ASF GitHub Bot commented on NIFI-5510: -- GitHub user zenfenan opened a pull request: https://github.com/apache/nifi/pull/2992 NIFI-5510: Introducing PutCassandraRecord processor Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zenfenan/nifi NIFI-5510 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2992.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2992 commit 4934a159fb769961c47d3809b522dfaa197fc1ac Author: zenfenan Date: 2018-08-17T09:56:47Z NIFI-5510: Introducing PutCassandraRecord processor > Allow records to be put directly into Cassandra > --- > > Key: NIFI-5510 > URL: https://issues.apache.org/jira/browse/NIFI-5510 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Matt Burgess >Assignee: Sivaprasanna Sethuraman >Priority: Major > > Currently the standard way of getting data into Cassandra through NiFi is to > use PutCassandraQL, which often means raw data needs to be converted to CQL > statements, usually done (with modifications) via ConvertJSONToSQL. > It would be better to have something closer to PutDatabaseRecord, a processor > called PutCassandraRecord perhaps, that would take the raw data and input it > into Cassandra "directly", without the need for the user to convert the data > into CQL statements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5510) Allow records to be put directly into Cassandra
[ https://issues.apache.org/jira/browse/NIFI-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577439#comment-16577439 ] Sivaprasanna Sethuraman commented on NIFI-5510: --- I was thinking about having a record based Cassandra processor sometime ago since I had to do unnecessar modifications and tweaks with other processors as you had mentioned. Got occupied with some other tasks so wasn't focusing on this. I think this is a good time to put some light on this and get this done. I'll work on it. > Allow records to be put directly into Cassandra > --- > > Key: NIFI-5510 > URL: https://issues.apache.org/jira/browse/NIFI-5510 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Matt Burgess >Assignee: Sivaprasanna Sethuraman >Priority: Major > > Currently the standard way of getting data into Cassandra through NiFi is to > use PutCassandraQL, which often means raw data needs to be converted to CQL > statements, usually done (with modifications) via ConvertJSONToSQL. > It would be better to have something closer to PutDatabaseRecord, a processor > called PutCassandraRecord perhaps, that would take the raw data and input it > into Cassandra "directly", without the need for the user to convert the data > into CQL statements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)