[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2755


---


[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-11 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r194413976
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
 ---
@@ -397,6 +398,9 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
 }
 
 hiveStreamingConnection = makeStreamingConnection(options, 
reader);
+// Add shutdown handler with higher priority than 
FileSystem shutdown hook so that streaming connection gets closed first before
+// filesystem close (to avoid ClosedChannelException)
+
ShutdownHookManager.addShutdownHook(hiveStreamingConnection::close,  
FileSystem.SHUTDOWN_HOOK_PRIORITY + 1);
--- End diff --

We have a full exception handling thing in the framework, but I will need 
to abort and close on other uncaught exceptions, so I changed the 
catch(Exception e) to catch(Throwable t)


---


[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-10 Thread prasanthj
Github user prasanthj commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r194301151
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
 ---
@@ -397,6 +398,9 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
 }
 
 hiveStreamingConnection = makeStreamingConnection(options, 
reader);
+// Add shutdown handler with higher priority than 
FileSystem shutdown hook so that streaming connection gets closed first before
+// filesystem close (to avoid ClosedChannelException)
+
ShutdownHookManager.addShutdownHook(hiveStreamingConnection::close,  
FileSystem.SHUTDOWN_HOOK_PRIORITY + 1);
--- End diff --

You may also want to add uncaught exception handler as I have seen 
instances where runtime exception or illegal state exception thrown by some 
other code which if not caught can create broken files. 


---


[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-06 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193413841
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
 ---
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.streaming.ConnectionError;
+import org.apache.hive.streaming.HiveStreamingConnection;
+import org.apache.hive.streaming.InvalidTable;
+import org.apache.hive.streaming.SerializationError;
+import org.apache.hive.streaming.StreamingConnection;
+import org.apache.hive.streaming.StreamingException;
+import org.apache.hive.streaming.StreamingIOFailure;
+import org.apache.hive.streaming.TransactionError;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.DiscontinuedException;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.hive.AuthenticationFailedException;
+import org.apache.nifi.util.hive.HiveConfigurator;
+import org.apache.nifi.util.hive.HiveOptions;
+import org.apache.hive.streaming.HiveRecordWriter;
+import org.apache.nifi.util.hive.HiveUtils;
+import org.apache.nifi.util.hive.ValidationResources;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static 
org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
+
+@Tags({"hive", "streaming", "put", "database", "store"})
+@CapabilityDescription("This processor uses Hive Streaming to send flow 
file records to an Apache Hive 3.0+ table. "
++ "The partition values are expected to be the 'last' fields

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-06 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193412766
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.TimestampParser;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class NiFiRecordSerDe extends AbstractSerDe {
+
+protected RecordReader recordReader;
+protected ComponentLog log;
+protected List columnNames;
+protected StructTypeInfo schema;
+
+protected StandardStructObjectInspector cachedObjectInspector;
+protected TimestampParser tsParser;
+
+private final static Pattern INTERNAL_PATTERN = 
Pattern.compile("_col([0-9]+)");
+
+public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
+this.recordReader = recordReader;
+this.log = log;
+}
+
+@Override
+public void initialize(Configuration conf, Properties tbl) {
+List columnTypes;
+StructTypeInfo rowTypeInfo;
+
+log.debug("Initializing NiFiRecordSerDe: {}", 
tbl.entrySet().toArray());
+
+// Get column names and types
+String columnNameProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMNS);
+String columnTypeProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+final String columnNameDelimiter = 
tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
+.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : 
String.valueOf(SerDeUtils.COMMA);
+// all table column names
+if (columnNameProperty.isEmpty()) {
+columnNames = new ArrayList<>(0);
+} else {
+columnNames = new 
ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
+}
+
+// all column types
+if (columnTypeProperty.isEmpty()) {
+columnTypes = new ArrayList<>(0);
+} else {
+columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+}
+
+log.debug("columns: {}, {

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193185676
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.TimestampParser;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class NiFiRecordSerDe extends AbstractSerDe {
+
+protected RecordReader recordReader;
+protected ComponentLog log;
+protected List columnNames;
+protected StructTypeInfo schema;
+
+protected StandardStructObjectInspector cachedObjectInspector;
+protected TimestampParser tsParser;
+
+private final static Pattern INTERNAL_PATTERN = 
Pattern.compile("_col([0-9]+)");
+
+public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
+this.recordReader = recordReader;
+this.log = log;
+}
+
+@Override
+public void initialize(Configuration conf, Properties tbl) {
+List columnTypes;
+StructTypeInfo rowTypeInfo;
+
+log.debug("Initializing NiFiRecordSerDe: {}", 
tbl.entrySet().toArray());
+
+// Get column names and types
+String columnNameProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMNS);
+String columnTypeProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+final String columnNameDelimiter = 
tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
+.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : 
String.valueOf(SerDeUtils.COMMA);
+// all table column names
+if (columnNameProperty.isEmpty()) {
+columnNames = new ArrayList<>(0);
+} else {
+columnNames = new 
ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
+}
+
+// all column types
+if (columnTypeProperty.isEmpty()) {
+columnTypes = new ArrayList<>(0);
+} else {
+columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+}
+
+log.debug("columns: {}, {

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread prasanthj
Github user prasanthj commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193184727
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
 ---
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.streaming.ConnectionError;
+import org.apache.hive.streaming.HiveStreamingConnection;
+import org.apache.hive.streaming.InvalidTable;
+import org.apache.hive.streaming.SerializationError;
+import org.apache.hive.streaming.StreamingConnection;
+import org.apache.hive.streaming.StreamingException;
+import org.apache.hive.streaming.StreamingIOFailure;
+import org.apache.hive.streaming.TransactionError;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.DiscontinuedException;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.hive.AuthenticationFailedException;
+import org.apache.nifi.util.hive.HiveConfigurator;
+import org.apache.nifi.util.hive.HiveOptions;
+import org.apache.hive.streaming.HiveRecordWriter;
+import org.apache.nifi.util.hive.HiveUtils;
+import org.apache.nifi.util.hive.ValidationResources;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static 
org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
+
+@Tags({"hive", "streaming", "put", "database", "store"})
+@CapabilityDescription("This processor uses Hive Streaming to send flow 
file records to an Apache Hive 3.0+ table. "
++ "The partition values are expected to be the 'last' fields

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread prasanthj
Github user prasanthj commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193184650
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.TimestampParser;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class NiFiRecordSerDe extends AbstractSerDe {
+
+protected RecordReader recordReader;
+protected ComponentLog log;
+protected List columnNames;
+protected StructTypeInfo schema;
+
+protected StandardStructObjectInspector cachedObjectInspector;
+protected TimestampParser tsParser;
+
+private final static Pattern INTERNAL_PATTERN = 
Pattern.compile("_col([0-9]+)");
+
+public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
+this.recordReader = recordReader;
+this.log = log;
+}
+
+@Override
+public void initialize(Configuration conf, Properties tbl) {
+List columnTypes;
+StructTypeInfo rowTypeInfo;
+
+log.debug("Initializing NiFiRecordSerDe: {}", 
tbl.entrySet().toArray());
+
+// Get column names and types
+String columnNameProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMNS);
+String columnTypeProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+final String columnNameDelimiter = 
tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
+.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : 
String.valueOf(SerDeUtils.COMMA);
+// all table column names
+if (columnNameProperty.isEmpty()) {
+columnNames = new ArrayList<>(0);
+} else {
+columnNames = new 
ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
+}
+
+// all column types
+if (columnTypeProperty.isEmpty()) {
+columnTypes = new ArrayList<>(0);
+} else {
+columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+}
+
+log.debug("columns: {}, {

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread prasanthj
Github user prasanthj commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193184086
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
 ---
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.streaming.ConnectionError;
+import org.apache.hive.streaming.HiveStreamingConnection;
+import org.apache.hive.streaming.InvalidTable;
+import org.apache.hive.streaming.SerializationError;
+import org.apache.hive.streaming.StreamingConnection;
+import org.apache.hive.streaming.StreamingException;
+import org.apache.hive.streaming.StreamingIOFailure;
+import org.apache.hive.streaming.TransactionError;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.DiscontinuedException;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.hive.AuthenticationFailedException;
+import org.apache.nifi.util.hive.HiveConfigurator;
+import org.apache.nifi.util.hive.HiveOptions;
+import org.apache.hive.streaming.HiveRecordWriter;
+import org.apache.nifi.util.hive.HiveUtils;
+import org.apache.nifi.util.hive.ValidationResources;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static 
org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
+
+@Tags({"hive", "streaming", "put", "database", "store"})
+@CapabilityDescription("This processor uses Hive Streaming to send flow 
file records to an Apache Hive 3.0+ table. "
++ "The partition values are expected to be the 'last' fields

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193183490
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.TimestampParser;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class NiFiRecordSerDe extends AbstractSerDe {
+
+protected RecordReader recordReader;
+protected ComponentLog log;
+protected List columnNames;
+protected StructTypeInfo schema;
+
+protected StandardStructObjectInspector cachedObjectInspector;
+protected TimestampParser tsParser;
+
+private final static Pattern INTERNAL_PATTERN = 
Pattern.compile("_col([0-9]+)");
+
+public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
+this.recordReader = recordReader;
+this.log = log;
+}
+
+@Override
+public void initialize(Configuration conf, Properties tbl) {
+List columnTypes;
+StructTypeInfo rowTypeInfo;
+
+log.debug("Initializing NiFiRecordSerDe: {}", 
tbl.entrySet().toArray());
+
+// Get column names and types
+String columnNameProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMNS);
+String columnTypeProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+final String columnNameDelimiter = 
tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
+.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : 
String.valueOf(SerDeUtils.COMMA);
+// all table column names
+if (columnNameProperty.isEmpty()) {
+columnNames = new ArrayList<>(0);
+} else {
+columnNames = new 
ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
+}
+
+// all column types
+if (columnTypeProperty.isEmpty()) {
+columnTypes = new ArrayList<>(0);
+} else {
+columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+}
+
+log.debug("columns: {}, {

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread prasanthj
Github user prasanthj commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193182827
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.TimestampParser;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class NiFiRecordSerDe extends AbstractSerDe {
+
+protected RecordReader recordReader;
+protected ComponentLog log;
+protected List columnNames;
+protected StructTypeInfo schema;
+
+protected StandardStructObjectInspector cachedObjectInspector;
+protected TimestampParser tsParser;
+
+private final static Pattern INTERNAL_PATTERN = 
Pattern.compile("_col([0-9]+)");
+
+public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
+this.recordReader = recordReader;
+this.log = log;
+}
+
+@Override
+public void initialize(Configuration conf, Properties tbl) {
+List columnTypes;
+StructTypeInfo rowTypeInfo;
+
+log.debug("Initializing NiFiRecordSerDe: {}", 
tbl.entrySet().toArray());
+
+// Get column names and types
+String columnNameProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMNS);
+String columnTypeProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+final String columnNameDelimiter = 
tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
+.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : 
String.valueOf(SerDeUtils.COMMA);
+// all table column names
+if (columnNameProperty.isEmpty()) {
+columnNames = new ArrayList<>(0);
+} else {
+columnNames = new 
ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
+}
+
+// all column types
+if (columnTypeProperty.isEmpty()) {
+columnTypes = new ArrayList<>(0);
+} else {
+columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+}
+
+log.debug("columns: {}, {

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193182526
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.orc.record;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
+import org.apache.hadoop.hive.ql.io.orc.Writer;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.processors.orc.PutORC.HIVE_DDL_ATTRIBUTE;
+
+/**
+ * HDFSRecordWriter that writes ORC files using Avro as the schema 
representation.
+ */
+
+public class ORCHDFSRecordWriter implements HDFSRecordWriter {
+
+private final Schema avroSchema;
+private final TypeInfo orcSchema;
+private final Writer orcWriter;
+private final String hiveTableName;
+private final boolean hiveFieldNames;
+
+public ORCHDFSRecordWriter(final Writer orcWriter, final Schema 
avroSchema, final String hiveTableName, final boolean hiveFieldNames) {
+this.avroSchema = avroSchema;
+this.orcWriter = orcWriter;
+this.hiveFieldNames = hiveFieldNames;
+this.orcSchema = NiFiOrcUtils.getOrcField(avroSchema, 
this.hiveFieldNames);
+this.hiveTableName = hiveTableName;
+}
+
+@Override
+public void write(final Record record) throws IOException {
+List fields = avroSchema.getFields();
--- End diff --

Yes will change


---


[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193182416
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
 ---
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.streaming.ConnectionError;
+import org.apache.hive.streaming.HiveStreamingConnection;
+import org.apache.hive.streaming.InvalidTable;
+import org.apache.hive.streaming.SerializationError;
+import org.apache.hive.streaming.StreamingConnection;
+import org.apache.hive.streaming.StreamingException;
+import org.apache.hive.streaming.StreamingIOFailure;
+import org.apache.hive.streaming.TransactionError;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.DiscontinuedException;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.hive.AuthenticationFailedException;
+import org.apache.nifi.util.hive.HiveConfigurator;
+import org.apache.nifi.util.hive.HiveOptions;
+import org.apache.hive.streaming.HiveRecordWriter;
+import org.apache.nifi.util.hive.HiveUtils;
+import org.apache.nifi.util.hive.ValidationResources;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static 
org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
+
+@Tags({"hive", "streaming", "put", "database", "store"})
+@CapabilityDescription("This processor uses Hive Streaming to send flow 
file records to an Apache Hive 3.0+ table. "
++ "The partition values are expected to be the 'last' fields

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193181866
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
 ---
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.streaming.ConnectionError;
+import org.apache.hive.streaming.HiveStreamingConnection;
+import org.apache.hive.streaming.InvalidTable;
+import org.apache.hive.streaming.SerializationError;
+import org.apache.hive.streaming.StreamingConnection;
+import org.apache.hive.streaming.StreamingException;
+import org.apache.hive.streaming.StreamingIOFailure;
+import org.apache.hive.streaming.TransactionError;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.DiscontinuedException;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.hive.AuthenticationFailedException;
+import org.apache.nifi.util.hive.HiveConfigurator;
+import org.apache.nifi.util.hive.HiveOptions;
+import org.apache.hive.streaming.HiveRecordWriter;
+import org.apache.nifi.util.hive.HiveUtils;
+import org.apache.nifi.util.hive.ValidationResources;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static 
org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
+
+@Tags({"hive", "streaming", "put", "database", "store"})
+@CapabilityDescription("This processor uses Hive Streaming to send flow 
file records to an Apache Hive 3.0+ table. "
++ "The partition values are expected to be the 'last' fields

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193181171
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.TimestampParser;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class NiFiRecordSerDe extends AbstractSerDe {
+
+protected RecordReader recordReader;
+protected ComponentLog log;
+protected List columnNames;
+protected StructTypeInfo schema;
+
+protected StandardStructObjectInspector cachedObjectInspector;
+protected TimestampParser tsParser;
+
+private final static Pattern INTERNAL_PATTERN = 
Pattern.compile("_col([0-9]+)");
+
+public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
+this.recordReader = recordReader;
+this.log = log;
+}
+
+@Override
+public void initialize(Configuration conf, Properties tbl) {
+List columnTypes;
+StructTypeInfo rowTypeInfo;
+
+log.debug("Initializing NiFiRecordSerDe: {}", 
tbl.entrySet().toArray());
+
+// Get column names and types
+String columnNameProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMNS);
+String columnTypeProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+final String columnNameDelimiter = 
tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
+.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : 
String.valueOf(SerDeUtils.COMMA);
+// all table column names
+if (columnNameProperty.isEmpty()) {
+columnNames = new ArrayList<>(0);
+} else {
+columnNames = new 
ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
+}
+
+// all column types
+if (columnTypeProperty.isEmpty()) {
+columnTypes = new ArrayList<>(0);
+} else {
+columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+}
+
+log.debug("columns: {}, {

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread prasanthj
Github user prasanthj commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193180621
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.TimestampParser;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class NiFiRecordSerDe extends AbstractSerDe {
+
+protected RecordReader recordReader;
+protected ComponentLog log;
+protected List columnNames;
+protected StructTypeInfo schema;
+
+protected StandardStructObjectInspector cachedObjectInspector;
+protected TimestampParser tsParser;
+
+private final static Pattern INTERNAL_PATTERN = 
Pattern.compile("_col([0-9]+)");
+
+public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
+this.recordReader = recordReader;
+this.log = log;
+}
+
+@Override
+public void initialize(Configuration conf, Properties tbl) {
+List columnTypes;
+StructTypeInfo rowTypeInfo;
+
+log.debug("Initializing NiFiRecordSerDe: {}", 
tbl.entrySet().toArray());
+
+// Get column names and types
+String columnNameProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMNS);
+String columnTypeProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+final String columnNameDelimiter = 
tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
+.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : 
String.valueOf(SerDeUtils.COMMA);
+// all table column names
+if (columnNameProperty.isEmpty()) {
+columnNames = new ArrayList<>(0);
+} else {
+columnNames = new 
ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
+}
+
+// all column types
+if (columnTypeProperty.isEmpty()) {
+columnTypes = new ArrayList<>(0);
+} else {
+columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+}
+
+log.debug("columns: {}, {

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193179938
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.TimestampParser;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class NiFiRecordSerDe extends AbstractSerDe {
+
+protected RecordReader recordReader;
+protected ComponentLog log;
+protected List columnNames;
+protected StructTypeInfo schema;
+
+protected StandardStructObjectInspector cachedObjectInspector;
+protected TimestampParser tsParser;
+
+private final static Pattern INTERNAL_PATTERN = 
Pattern.compile("_col([0-9]+)");
+
+public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
+this.recordReader = recordReader;
+this.log = log;
+}
+
+@Override
+public void initialize(Configuration conf, Properties tbl) {
+List columnTypes;
+StructTypeInfo rowTypeInfo;
+
+log.debug("Initializing NiFiRecordSerDe: {}", 
tbl.entrySet().toArray());
+
+// Get column names and types
+String columnNameProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMNS);
+String columnTypeProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+final String columnNameDelimiter = 
tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
+.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : 
String.valueOf(SerDeUtils.COMMA);
+// all table column names
+if (columnNameProperty.isEmpty()) {
+columnNames = new ArrayList<>(0);
+} else {
+columnNames = new 
ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
+}
+
+// all column types
+if (columnTypeProperty.isEmpty()) {
+columnTypes = new ArrayList<>(0);
+} else {
+columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+}
+
+log.debug("columns: {}, {

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread prasanthj
Github user prasanthj commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193177877
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
 ---
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.streaming.ConnectionError;
+import org.apache.hive.streaming.HiveStreamingConnection;
+import org.apache.hive.streaming.InvalidTable;
+import org.apache.hive.streaming.SerializationError;
+import org.apache.hive.streaming.StreamingConnection;
+import org.apache.hive.streaming.StreamingException;
+import org.apache.hive.streaming.StreamingIOFailure;
+import org.apache.hive.streaming.TransactionError;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.DiscontinuedException;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.hive.AuthenticationFailedException;
+import org.apache.nifi.util.hive.HiveConfigurator;
+import org.apache.nifi.util.hive.HiveOptions;
+import org.apache.hive.streaming.HiveRecordWriter;
+import org.apache.nifi.util.hive.HiveUtils;
+import org.apache.nifi.util.hive.ValidationResources;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static 
org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
+
+@Tags({"hive", "streaming", "put", "database", "store"})
+@CapabilityDescription("This processor uses Hive Streaming to send flow 
file records to an Apache Hive 3.0+ table. "
++ "The partition values are expected to be the 'last' fields

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread prasanthj
Github user prasanthj commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193178966
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.orc.record;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
+import org.apache.hadoop.hive.ql.io.orc.Writer;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.processors.orc.PutORC.HIVE_DDL_ATTRIBUTE;
+
+/**
+ * HDFSRecordWriter that writes ORC files using Avro as the schema 
representation.
+ */
+
+public class ORCHDFSRecordWriter implements HDFSRecordWriter {
+
+private final Schema avroSchema;
+private final TypeInfo orcSchema;
+private final Writer orcWriter;
+private final String hiveTableName;
+private final boolean hiveFieldNames;
+
+public ORCHDFSRecordWriter(final Writer orcWriter, final Schema 
avroSchema, final String hiveTableName, final boolean hiveFieldNames) {
+this.avroSchema = avroSchema;
+this.orcWriter = orcWriter;
+this.hiveFieldNames = hiveFieldNames;
+this.orcSchema = NiFiOrcUtils.getOrcField(avroSchema, 
this.hiveFieldNames);
+this.hiveTableName = hiveTableName;
+}
+
+@Override
+public void write(final Record record) throws IOException {
+List fields = avroSchema.getFields();
+if (fields != null) {
+Object[] row = new Object[fields.size()];
--- End diff --

Same for this array. If the fields does not change, can be a single 
allocation.


---


[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread prasanthj
Github user prasanthj commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193178839
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.orc.record;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
+import org.apache.hadoop.hive.ql.io.orc.Writer;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.processors.orc.PutORC.HIVE_DDL_ATTRIBUTE;
+
+/**
+ * HDFSRecordWriter that writes ORC files using Avro as the schema 
representation.
+ */
+
+public class ORCHDFSRecordWriter implements HDFSRecordWriter {
+
+private final Schema avroSchema;
+private final TypeInfo orcSchema;
+private final Writer orcWriter;
+private final String hiveTableName;
+private final boolean hiveFieldNames;
+
+public ORCHDFSRecordWriter(final Writer orcWriter, final Schema 
avroSchema, final String hiveTableName, final boolean hiveFieldNames) {
+this.avroSchema = avroSchema;
+this.orcWriter = orcWriter;
+this.hiveFieldNames = hiveFieldNames;
+this.orcSchema = NiFiOrcUtils.getOrcField(avroSchema, 
this.hiveFieldNames);
+this.hiveTableName = hiveTableName;
+}
+
+@Override
+public void write(final Record record) throws IOException {
+List fields = avroSchema.getFields();
--- End diff --

If fields does not change this can be outside of inner loop?


---


[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread prasanthj
Github user prasanthj commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193173275
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.TimestampParser;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class NiFiRecordSerDe extends AbstractSerDe {
+
+protected RecordReader recordReader;
+protected ComponentLog log;
+protected List columnNames;
+protected StructTypeInfo schema;
+
+protected StandardStructObjectInspector cachedObjectInspector;
+protected TimestampParser tsParser;
+
+private final static Pattern INTERNAL_PATTERN = 
Pattern.compile("_col([0-9]+)");
+
+public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
+this.recordReader = recordReader;
+this.log = log;
+}
+
+@Override
+public void initialize(Configuration conf, Properties tbl) {
+List columnTypes;
+StructTypeInfo rowTypeInfo;
+
+log.debug("Initializing NiFiRecordSerDe: {}", 
tbl.entrySet().toArray());
+
+// Get column names and types
+String columnNameProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMNS);
+String columnTypeProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+final String columnNameDelimiter = 
tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
+.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : 
String.valueOf(SerDeUtils.COMMA);
+// all table column names
+if (columnNameProperty.isEmpty()) {
+columnNames = new ArrayList<>(0);
+} else {
+columnNames = new 
ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
+}
+
+// all column types
+if (columnTypeProperty.isEmpty()) {
+columnTypes = new ArrayList<>(0);
+} else {
+columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+}
+
+log.debug("columns: {}, {

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread prasanthj
Github user prasanthj commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193174462
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.TimestampParser;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class NiFiRecordSerDe extends AbstractSerDe {
+
+protected RecordReader recordReader;
+protected ComponentLog log;
+protected List columnNames;
+protected StructTypeInfo schema;
+
+protected StandardStructObjectInspector cachedObjectInspector;
+protected TimestampParser tsParser;
+
+private final static Pattern INTERNAL_PATTERN = 
Pattern.compile("_col([0-9]+)");
+
+public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
+this.recordReader = recordReader;
+this.log = log;
+}
+
+@Override
+public void initialize(Configuration conf, Properties tbl) {
+List columnTypes;
+StructTypeInfo rowTypeInfo;
+
+log.debug("Initializing NiFiRecordSerDe: {}", 
tbl.entrySet().toArray());
+
+// Get column names and types
+String columnNameProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMNS);
+String columnTypeProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+final String columnNameDelimiter = 
tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
+.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : 
String.valueOf(SerDeUtils.COMMA);
+// all table column names
+if (columnNameProperty.isEmpty()) {
+columnNames = new ArrayList<>(0);
+} else {
+columnNames = new 
ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
+}
+
+// all column types
+if (columnTypeProperty.isEmpty()) {
+columnTypes = new ArrayList<>(0);
+} else {
+columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+}
+
+log.debug("columns: {}, {

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-05 Thread prasanthj
Github user prasanthj commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r193176667
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
 ---
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.streaming.ConnectionError;
+import org.apache.hive.streaming.HiveStreamingConnection;
+import org.apache.hive.streaming.InvalidTable;
+import org.apache.hive.streaming.SerializationError;
+import org.apache.hive.streaming.StreamingConnection;
+import org.apache.hive.streaming.StreamingException;
+import org.apache.hive.streaming.StreamingIOFailure;
+import org.apache.hive.streaming.TransactionError;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.DiscontinuedException;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.hive.AuthenticationFailedException;
+import org.apache.nifi.util.hive.HiveConfigurator;
+import org.apache.nifi.util.hive.HiveOptions;
+import org.apache.hive.streaming.HiveRecordWriter;
+import org.apache.nifi.util.hive.HiveUtils;
+import org.apache.nifi.util.hive.ValidationResources;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static 
org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
+
+@Tags({"hive", "streaming", "put", "database", "store"})
+@CapabilityDescription("This processor uses Hive Streaming to send flow 
file records to an Apache Hive 3.0+ table. "
++ "The partition values are expected to be the 'last' fields

[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-04 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2755#discussion_r192763045
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import com.google.common.base.Joiner;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Properties;
+
+public class HiveRecordWriter extends AbstractRecordWriter {
--- End diff --

@prasanthj Do you mind taking a look at HiveRecordWriter and 
NiFiRecordSerDe (and PutHive3Streaming which uses them when creating the 
connection and passing in options)? Those are the custom impls for the new Hive 
Streaming API classes, hoping for suggestions on improving performance, etc. 
Thanks in advance!


---


[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle

2018-06-04 Thread mattyb149
GitHub user mattyb149 opened a pull request:

https://github.com/apache/nifi/pull/2755

NIFI-4963: Added Hive3 bundle

You'll need to activate the include-hive3 profile when building the 
assembly, it is currently being excluded by default due to its size (~200 MB).

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [x] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [x] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [x] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [x] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [x] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mattyb149/nifi NIFI-4963

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2755.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2755


commit 417bc821d277a0556842f5aa734d854ca225147b
Author: Matthew Burgess 
Date:   2018-06-04T14:29:08Z

NIFI-4963: Added Hive3 bundle




---