[GitHub] nifi pull request #2160: [NiFi-4384] - Enhance PutKudu processor to support ...

2017-09-21 Thread cammachusa
Github user cammachusa commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2160#discussion_r140365671
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -94,6 +97,29 @@
 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
 .build();
 
+protected static final PropertyDescriptor FLUSH_MODE = new 
PropertyDescriptor.Builder()
+.name("Flush Mode")
+.description("Set the new flush mode for a kudu session\n" +
+"AUTO_FLUSH_SYNC: the call returns when the operation 
is persisted, else it throws an exception.\n" +
+"AUTO_FLUSH_BACKGROUND: the call returns when the 
operation has been added to the buffer. This call should normally perform only 
fast in-memory" +
+" operations but it may have to wait when the buffer 
is full and there's another buffer being flushed.\n" +
+"MANUAL_FLUSH: the call returns when the operation has 
been added to the buffer, else it throws a KuduException if the buffer is 
full.")
+.allowableValues(SessionConfiguration.FlushMode.values())
+
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString())
+.required(true)
+.build();
+
+protected static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("Batch Size")
+.description("Set the number of operations that can be 
buffered, between 2 - 10. " +
+"Depend on your memory size, and data size per row set 
an appropriate batch size. " +
+"Gradually increase this number to find out your best 
one for best performance")
+.defaultValue("100")
--- End diff --

Like, I made in note in the description. It's depend on their memory size, 
and data row being inserted, and also their cluster size. Setting the buffer 
size too big won't help, and too small won't help either. And at noted, 
developer got to find out this number from his environment. A lot of people hit 
performance peak at 50 with single machine Kudu's cluster. My colleague hit 
performance peak at 3500 with 6 nodes cluster (10 CPU, 64 GB Memory each). I 
randomly pick 100 as I saw it from other Put-xxx processor, but I don't want to 
put 1000 since most developers test it with single machine, and would leave 
this default value.


---


[GitHub] nifi pull request #2160: [NiFi-4384] - Enhance PutKudu processor to support ...

2017-09-21 Thread cammachusa
Github user cammachusa commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2160#discussion_r140362618
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -94,6 +97,29 @@
 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
 .build();
 
+protected static final PropertyDescriptor FLUSH_MODE = new 
PropertyDescriptor.Builder()
+.name("Flush Mode")
+.description("Set the new flush mode for a kudu session\n" +
+"AUTO_FLUSH_SYNC: the call returns when the operation 
is persisted, else it throws an exception.\n" +
+"AUTO_FLUSH_BACKGROUND: the call returns when the 
operation has been added to the buffer. This call should normally perform only 
fast in-memory" +
+" operations but it may have to wait when the buffer 
is full and there's another buffer being flushed.\n" +
+"MANUAL_FLUSH: the call returns when the operation has 
been added to the buffer, else it throws a KuduException if the buffer is 
full.")
+.allowableValues(SessionConfiguration.FlushMode.values())
+
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString())
+.required(true)
+.build();
+
+protected static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("Batch Size")
+.description("Set the number of operations that can be 
buffered, between 2 - 10. " +
+"Depend on your memory size, and data size per row set 
an appropriate batch size. " +
+"Gradually increase this number to find out your best 
one for best performance")
+.defaultValue("100")
+.required(true)
+.addValidator(StandardValidators.createLongValidator(2, 
10, true))
--- End diff --

The value of 1 wouldn't make sense. If set 1, the buffer will always have 
one item, since its purpose is to queue up coming items. Second, doing so, will 
significantly degrade the performance. The read always faster than the write.


---


[GitHub] nifi pull request #2160: [NiFi-4384] - Enhance PutKudu processor to support ...

2017-09-21 Thread cammachusa
Github user cammachusa commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2160#discussion_r140340082
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -124,11 +148,14 @@ public void OnScheduled(final ProcessContext context) 
{
 kuduTable = this.getKuduTable(kuduClient, tableName);
 getLogger().debug("Kudu connection successfully 
initialized");
 }
+
--- End diff --

And, feel free to let me know what else should be adjusted


---


[GitHub] nifi pull request #2160: [NiFi-4384] - Enhance PutKudu processor to support ...

2017-09-21 Thread cammachusa
Github user cammachusa commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2160#discussion_r140339764
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -124,11 +148,14 @@ public void OnScheduled(final ProcessContext context) 
{
 kuduTable = this.getKuduTable(kuduClient, tableName);
 getLogger().debug("Kudu connection successfully 
initialized");
 }
+
--- End diff --

Got it, and thank you for your notes @pvillard31 . I will have a push to 
fix those soon.


---


[GitHub] nifi pull request #2160: [NiFi-4384] - Enhance PutKudu processor to support ...

2017-09-21 Thread cammachusa
Github user cammachusa commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2160#discussion_r140320346
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -124,11 +148,14 @@ public void OnScheduled(final ProcessContext context) 
{
 kuduTable = this.getKuduTable(kuduClient, tableName);
 getLogger().debug("Kudu connection successfully 
initialized");
 }
+
--- End diff --

@pvillard31 , I'm not quite sure to understand it. What would you suggest 
me to change?


---


[GitHub] nifi pull request #2160: [NiFi-4384] - Enhance PutKudu processor to support ...

2017-09-21 Thread cammachusa
Github user cammachusa commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2160#discussion_r140314637
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -94,6 +95,27 @@
 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
 .build();
 
+protected static final PropertyDescriptor FLUSH_MODE = new 
PropertyDescriptor.Builder()
+.name("Flush Mode")
+.description("Set the new flush mode for a kudu session\n" +
+"AUTO_FLUSH_SYNC: the call returns when the operation 
is persisted, else it throws an exception.\n" +
+"AUTO_FLUSH_BACKGROUND: the call returns when the 
operation has been added to the buffer. This call should normally perform only 
fast in-memory" +
+" operations but it may have to wait when the buffer 
is full and there's another buffer being flushed.\n" +
+"MANUAL_FLUSH: the call returns when the operation has 
been added to the buffer, else it throws a KuduException if the buffer is 
full.")
+.allowableValues(SessionConfiguration.FlushMode.values())
+
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString())
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("Batch Size")
+.description("Set the number of operations that can be 
buffered")
+.defaultValue("100")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
--- End diff --

Excellent!


---


[GitHub] nifi pull request #2160: [NiFi-4384] - Enhance PutKudu processor to support ...

2017-09-21 Thread cammachusa
Github user cammachusa commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2160#discussion_r140314337
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -94,6 +95,27 @@
 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
 .build();
 
+protected static final PropertyDescriptor FLUSH_MODE = new 
PropertyDescriptor.Builder()
+.name("Flush Mode")
+.description("Set the new flush mode for a kudu session\n" +
+"AUTO_FLUSH_SYNC: the call returns when the operation 
is persisted, else it throws an exception.\n" +
+"AUTO_FLUSH_BACKGROUND: the call returns when the 
operation has been added to the buffer. This call should normally perform only 
fast in-memory" +
+" operations but it may have to wait when the buffer 
is full and there's another buffer being flushed.\n" +
+"MANUAL_FLUSH: the call returns when the operation has 
been added to the buffer, else it throws a KuduException if the buffer is 
full.")
+.allowableValues(SessionConfiguration.FlushMode.values())
+
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString())
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
--- End diff --

Agree!


---


[GitHub] nifi pull request #2160: [NiFi-4384] - Enhance PutKudu processor to support ...

2017-09-21 Thread cammachusa
Github user cammachusa commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2160#discussion_r140314045
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -94,6 +95,27 @@
 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
 .build();
 
+protected static final PropertyDescriptor FLUSH_MODE = new 
PropertyDescriptor.Builder()
+.name("Flush Mode")
+.description("Set the new flush mode for a kudu session\n" +
--- End diff --

@pvillard31 , different methods, archiving the same purpose :-) I don't 
have any opinion. It was they way Ricky suggested in its initial PR #2020 . I 
would leave it like that since it looks straightforward :-)


---


[GitHub] nifi issue #2160: [NiFi-4384] - Enhance PutKudu processor to support batch i...

2017-09-18 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2160
  
Hi @joewitt and @rickysaltzer , would you like to review this PR about 
PutKudu again? :-) It's simply added a couple more configurations to help users 
speed up the ingestion process. Thanks, 


---


[GitHub] nifi pull request #2160: [NiFi-4384] - Enhance PutKudu processor to support ...

2017-09-18 Thread cammachusa
GitHub user cammachusa opened a pull request:

https://github.com/apache/nifi/pull/2160

[NiFi-4384] - Enhance PutKudu processor to support batch insert

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [ ] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/InspurUSA/nifi NiFi-4384

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2160.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2160


commit 2ddbeda258421d39b7bd6f1b48cf17ded18743f6
Author: cam <camm...@inspur.com>
Date:   2017-09-14T22:29:08Z

[NiFi-4384] - Enhance PutKudu processor to support batch insert




---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-25 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2020
  
Thanks @rickysaltzer , I didn't mean to rush you :-) . It's just because I 
have to report to my boss, since it's the end of the sprint. It was nice 
working with you, and sure I will contribute more to the community.
Cheer!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-24 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2020
  
@rickysaltzer , I though the version should be consistence for all, so. 
Anyway, I revert it 1.3 for the files above, and rebase to have 1 commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-22 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2020
  
@rickysaltzer thanks for the guide :-) Working on it now


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-18 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2020
  
@rickysaltzer , I don't know how to do that, can you give me some detail 
guidance? (from getting your commit to pushing to this PR)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-16 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2020
  
@rickysaltzer , I looked at your branch, it's identical with my branch. So 
sure, it will work. Just to clarify, what you mean by "pull it down, ... and 
force push it to this review"? Does that mean, pull your branch, and merge to 
mine, and push to Pull Request? Or pull your branch, and push to this PR, to 
clean up all previous commits to have just one? If the second, I haven't done 
cross push, can you help with what git commands I should run?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-16 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2020
  
@rickysaltzer , hahaha. When will you close this PR, I am waiting for your 
review every day :=)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-10 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2020
  
The last commit, I switched to onScheduled already. Thanks for your review 
and code @rickysaltzer  There are some code style check errors. I'm on it now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-10 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2020
  
@rickysaltzer , remind :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-07 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2020
  
@rickysaltzer any comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-04 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2020
  
Hi @rickysaltzer , switching to ConfigurationContext still doesn't work, 
all still null. As I noticed that none of Put/Getxxx processors (e.g. PutHBase, 
PutParquet, PutCassandra) uses OnEnable, they use OnSchedule instead. And, as I 
understand OnScheduled only get kicked off once (can be every 5 minutes or 10, 
depend on user's conf), but onTrigger will be triggered several times depend on 
how many flowfile you have in one schedule. @joewitt may mean OnSchedule, not 
OnEnabled, because I never used OnEnable to my PutKudu (recall his comments a 
couple week ago about thread safe of Kudu client).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-04 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2020
  
sorry @rickysaltzer , I was rush, so didn't run the test when switch to 
onEnabled. Interestingly!, the OnEnabled is not being called, so objects 
defined in it are all null when running test cases. Maybe I should have both 
OnEnabled (for KuduClient and KuduTable), and OnScheduled for the rest of 
variables. Any advise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-08-03 Thread cammachusa
Github user cammachusa commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r131258877
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.Upsert;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractKudu extends AbstractProcessor {
+
+protected static final PropertyDescriptor KUDU_MASTERS = new 
PropertyDescriptor.Builder()
+.name("KUDU Masters")
+.description("List all kudu masters's ip with port (e.g. 
7051), comma separated")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+.name("Table Name")
+.description("The name of the Kudu Table to put data into")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+.name("record-reader")
+.displayName("Record Reader")
+.description("The service for reading records from incoming 
flow files.")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+protected static final PropertyDescriptor SKIP_HEAD_LINE = new 
PropertyDescriptor.Builder()
+.name("Skip head line")
+.description("Set it to true if your first line is the header 
line e.g. column names")
+.allowableValues("true", "false")
+.defaultValue("true")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final PropertyDescriptor INSERT_OPERATION = new 
PropertyDescriptor.Builder()
+.name("Insert Operation")
+.description("Specify operation for this processor. 
Insert-Ignore will ignore duplicated rows")
+.allowableValues(OperationType.INSERT.toString(), 
Opera

[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-08-03 Thread cammachusa
Github user cammachusa commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r131258450
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.Upsert;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "database", "NoSQL", "kudu", "HDFS"})
+@CapabilityDescription("Reads records from an incoming FlowFile using the 
provided Record Reader, and writes those records " +
+"to the specified Kudu's table. The schema for the table must be 
provided in the processor properties or from your source." +
+" If any error occurs while reading records from the input, or 
writing records to Kudu, the FlowFile will be routed to failure")
+
+public class PutKudu extends AbstractKudu {
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+final List properties = new ArrayList<>();
+properties.add(KUDU_MASTERS);
+properties.add(TABLE_NAME);
+properties.add(SKIP_HEAD_LINE);
+properties.add(RECORD_READER);
+properties.add(INSERT_OPERATION);
+
+return properties;
+}
+
+@Override
+public Set getRelationships() {
+final Set rels = new HashSet<>();
+rels.add(REL_SUCCESS);
+rels.add(REL_FAILURE);
+return rels;
+}
+
+@Override
+protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record 
record, List fieldNames) throws IllegalStateException, Exception {
+Upsert upsert = kuduTable.newUpsert();
+this.insert(kuduTable, upsert, record, fieldNames);
+return upsert;
+}
+
+@Override
+protected Insert insertRecordToKudu(KuduTable kuduTable, Record 
record, List fieldNames) throws IllegalStateException, Exception {
+Insert insert = kuduTable.newInsert();
+this.insert(kuduTable, insert, record, fieldNames);
+return insert;
+}
+
+private void insert(KuduTable kuduTable, Operation operation, Record 
record, List fieldNames){
+PartialRow row = operation.getRow();
+Schema colSchema = kuduTable.getSchema();
+
+for (String colName : fieldNames) {
+int colIdx = this.getColumnIndex(colSchema, colName);
+if (colIdx != -1) {
+Type colType = 
colSchema.getColumnByIndex(colIdx).getType();
+
+switch (colType.getDataType()) {
+case BOOL:
+row.addBoolean(colIdx, 
record.getAsBoolean(colName));
+break;
+case FLOAT:
+row.addFloat(colIdx, record.getAsFloat(colName));
+  

[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-08-03 Thread cammachusa
Github user cammachusa commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r131257922
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.Upsert;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "database", "NoSQL", "kudu", "HDFS"})
+@CapabilityDescription("Reads records from an incoming FlowFile using the 
provided Record Reader, and writes those records " +
+"to the specified Kudu's table. The schema for the table must be 
provided in the processor properties or from your source." +
+" If any error occurs while reading records from the input, or 
writing records to Kudu, the FlowFile will be routed to failure")
+
+public class PutKudu extends AbstractKudu {
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+final List properties = new ArrayList<>();
+properties.add(KUDU_MASTERS);
+properties.add(TABLE_NAME);
+properties.add(SKIP_HEAD_LINE);
+properties.add(RECORD_READER);
+properties.add(INSERT_OPERATION);
+
+return properties;
+}
+
+@Override
+public Set getRelationships() {
+final Set rels = new HashSet<>();
+rels.add(REL_SUCCESS);
+rels.add(REL_FAILURE);
+return rels;
+}
+
+@Override
+protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record 
record, List fieldNames) throws IllegalStateException, Exception {
+Upsert upsert = kuduTable.newUpsert();
+this.insert(kuduTable, upsert, record, fieldNames);
+return upsert;
+}
+
+@Override
+protected Insert insertRecordToKudu(KuduTable kuduTable, Record 
record, List fieldNames) throws IllegalStateException, Exception {
+Insert insert = kuduTable.newInsert();
+this.insert(kuduTable, insert, record, fieldNames);
+return insert;
+}
+
+private void insert(KuduTable kuduTable, Operation operation, Record 
record, List fieldNames){
+PartialRow row = operation.getRow();
+Schema colSchema = kuduTable.getSchema();
+
+for (String colName : fieldNames) {
+int colIdx = this.getColumnIndex(colSchema, colName);
+if (colIdx != -1) {
+Type colType = 
colSchema.getColumnByIndex(colIdx).getType();
+
+switch (colType.getDataType()) {
+case BOOL:
+row.addBoolean(colIdx, 
record.getAsBoolean(colName));
+break;
+case FLOAT:
+row.addFloat(colIdx, record.getAsFloat(colName));
+  

[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-08-03 Thread cammachusa
Github user cammachusa commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r131257343
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.Upsert;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractKudu extends AbstractProcessor {
+
+protected static final PropertyDescriptor KUDU_MASTERS = new 
PropertyDescriptor.Builder()
+.name("KUDU Masters")
+.description("List all kudu masters's ip with port (e.g. 
7051), comma separated")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+.name("Table Name")
+.description("The name of the Kudu Table to put data into")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+.name("record-reader")
+.displayName("Record Reader")
+.description("The service for reading records from incoming 
flow files.")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+protected static final PropertyDescriptor SKIP_HEAD_LINE = new 
PropertyDescriptor.Builder()
+.name("Skip head line")
+.description("Set it to true if your first line is the header 
line e.g. column names")
+.allowableValues("true", "false")
+.defaultValue("true")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final PropertyDescriptor INSERT_OPERATION = new 
PropertyDescriptor.Builder()
+.name("Insert Operation")
+.description("Specify operation for this processor. 
Insert-Ignore will ignore duplicated rows")
+.allowableValues(OperationType.INSE

[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-07-31 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2020
  
@joewitt , would you please help? I couldn't start nifi, it keep saying 
"ClassNotFoundException: org.apache.nifi.serialization.RecordReaderFactory" . I 
can build it without any error, but when starting nifi, ... Some of my 
dependency on RecordReader may not be correct, or missing something? Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-07-31 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2020
  
Hi @joewitt and @rickysaltzer , I am following the PutParquet to implement 
the PutKudu, and so have the same dependencies as the PutParquet's, and got it 
build success. But build on Travis was always fail. And I realized this morning 
that the reason is Record Reader is not properly referenced and deployed for my 
PutKudu (I deployed it manually to test the PutKudu first). I checked the log, 
and here is the message: "ClassNotFoundException: 
org.apache.nifi.serialization.RecordReaderFactory"
I compare every files of PutParquet with my PutKudu, but couldn't find any 
missing part of it. Would you please advise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-07-28 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2020
  
Look like, i messed up the history again. I just pushed a commit to the 
branch, then comments gone :(
@rickysaltzer , can you review it again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-07-28 Thread cammachusa
Github user cammachusa commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r130180698
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Insert;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractKudu extends AbstractProcessor {
+
+protected static final PropertyDescriptor KUDU_MASTERS = new 
PropertyDescriptor.Builder()
+.name("KUDU Masters")
+.description("List all kudu masters's ip with port (e.g. 
7051), comma separated")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+.name("Table Name")
+.description("The name of the Kudu Table to put data into")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+.name("record-reader")
+.displayName("Record Reader")
+.description("The service for reading records from incoming 
flow files.")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+protected static final PropertyDescriptor SKIP_HEAD_LINE = new 
PropertyDescriptor.Builder()
+.name("Skip head line")
+.description("Set it to true if your first line is the header 
line e.g. column names")
+.allowableValues("true", "false")
+.defaultValue("true")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+.name("success")
+.description("A FlowFile is routed to this relationship after 
it has been successfully stored in Kudu")
+.build();
+protected static final Relationship REL_FAILURE = new 
Relationship.Builder()
+.name("failure")
+.description("A

[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-07-28 Thread cammachusa
Github user cammachusa commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r130165625
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.KuduTable;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "database", "NoSQL", "kudu", "HDFS"})
+@CapabilityDescription("Reads records from an incoming FlowFile using the 
provided Record Reader, and writes those records " +
+"to the specified Kudu's table. The schema for the table must be 
provided in the processor properties or from your source." +
+" If any error occurs while reading records from the input, or 
writing records to Kudu, the FlowFile will be routed to failure")
+
+public class PutKudu extends AbstractKudu {
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+final List properties = new ArrayList<>();
+properties.add(KUDU_MASTERS);
+properties.add(TABLE_NAME);
+properties.add(SKIP_HEAD_LINE);
+properties.add(RECORD_READER);
+
+return properties;
+}
+
+@Override
+public Set getRelationships() {
+final Set rels = new HashSet<>();
+rels.add(REL_SUCCESS);
+rels.add(REL_FAILURE);
+return rels;
+}
+
+@Override
+protected Insert insertRecordToKudu(KuduTable kuduTable, Record 
record, List fieldNames) throws IllegalStateException, Exception {
+Insert insert = kuduTable.newInsert();
+PartialRow row = insert.getRow();
+Schema colSchema = kuduTable.getSchema();
+
+for (String colName : fieldNames) {
+int colIdx = this.getColumnIndex(colSchema, colName);
+if (colIdx != -1) {
+Type colType = 
colSchema.getColumnByIndex(colIdx).getType();
+
+switch (colType.getDataType()) {
+case BOOL:
+row.addBoolean(colIdx, 
record.getAsBoolean(colName));
+break;
+case FLOAT:
+row.addFloat(colIdx, record.getAsFloat(colName));
+break;
+case DOUBLE:
+row.addDouble(colIdx, record.getAsDouble(colName));
+break;
+case BINARY:
+row.addBinary(colIdx, 
record.getAsString(colName).getBytes());
+break;
+case INT8:
+case INT16:
--- End diff --

case INT8:
case INT16:
I got to throw an exception, because the RecordReader doesn't have 
getAsShort(), only getAsInt(), and if I use getAsInt() for Int8 and Int16, it 
will throw an exception.


---
If your project is set up

[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-07-21 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2020
  
Hi @joewitt , there is a sandbox that has Kudu instance (and related 
components) that you can quickly spin up a VM and test the processor. 
https://kudu.apache.org/docs/quickstart.html
In case you need something diff, I can also provision an AWS's VM with KuDu 
installed and give you the access?
Btw, can you find me a second reviewer?
Thanks,



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-07-19 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2020
  
@joewitt , my apologize again. I'm still new to the process, so mess it up 
a bit. I did some manual test with 2 diff Kudu env, and with CSV file as a 
source. Just in case, you or other reviewers ask about GetKudu processor, I 
will add one after this PutKudu get approved.
Thanks,



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-07-19 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/2020
  
@joewitt , thanks for the goods points. I looked at PutCassandra, PutHBase, 
PutKite, to build this PutKudu, and so forget to move those objects to 
onTrigger when following your suggestion, PutParquet as example. I just fixed 
it, please review and let me know what else I should adjust?
Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #1874: NiFi 3973 - Create a new Kudu Processor to ingest d...

2017-07-19 Thread cammachusa
Github user cammachusa closed the pull request at:

https://github.com/apache/nifi/pull/1874


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #1874: NiFi 3973 - Create a new Kudu Processor to ingest data

2017-07-18 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/1874
  
@joewitt , would you please help me to review this pull request: 
https://github.com/apache/nifi/pull/2020
I created a new one for this same ticket, because this pr can't trigger a 
build in Travis.
Thanks,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-07-18 Thread cammachusa
GitHub user cammachusa opened a pull request:

https://github.com/apache/nifi/pull/2020

[NiFi-3973] Add PutKudu Processor for ingesting data to Kudu (2nd)

…mits)

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [ ] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/InspurUSA/nifi NiFi-3973

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2020.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2020


commit 53da81e6b028bb4f269a8f9a19357e6b0c31511a
Author: cam <camm...@inspur.com>
Date:   2017-06-07T01:25:41Z

NiFi 3973 Create a new Kudu Processor to ingest data (combining 5 commits)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #1874: NiFi 3973 - Create a new Kudu Processor to ingest data

2017-06-22 Thread cammachusa
Github user cammachusa commented on the issue:

https://github.com/apache/nifi/pull/1874
  
Thanks @joewitt for your review and comments. I'm working on it to fix the 
issues


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #1874: NiFi 3973 - Create a new Kudu Processor to ingest d...

2017-05-31 Thread cammachusa
GitHub user cammachusa opened a pull request:

https://github.com/apache/nifi/pull/1874

NiFi 3973 - Create a new Kudu Processor to ingest data  

NiFi 3973 - Create a new Kudu Processor to ingest data into Kudu data 
storage. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/InspurUSA/nifi kudu-processor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/1874.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1874


commit d86afc8bdee940d613e7890aa7e4407d7b865a76
Author: cam <cam@camdev02.redmond1>
Date:   2017-05-27T00:53:56Z

create Kudu processor

commit fe99b9ebf80c39fb4042fdf5cebc89c8dc69a819
Author: cam <cam@camdev02.redmond1>
Date:   2017-05-27T00:57:08Z

Create Kudu processor

commit 96ac872f9d5cb2e75c1affe9fa0a1d95e07d7f2d
Author: cam <camm...@inspur.com>
Date:   2017-05-30T18:42:07Z

added more test cases

commit a31c9c2f0e0a9b85ce00921f732a912e29c41c13
Author: cam <camm...@inspur.com>
Date:   2017-05-31T19:15:10Z

add 2 more test cases




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---