[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2755 ---
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r194413976 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java --- @@ -397,6 +398,9 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro } hiveStreamingConnection = makeStreamingConnection(options, reader); +// Add shutdown handler with higher priority than FileSystem shutdown hook so that streaming connection gets closed first before +// filesystem close (to avoid ClosedChannelException) + ShutdownHookManager.addShutdownHook(hiveStreamingConnection::close, FileSystem.SHUTDOWN_HOOK_PRIORITY + 1); --- End diff -- We have a full exception handling thing in the framework, but I will need to abort and close on other uncaught exceptions, so I changed the catch(Exception e) to catch(Throwable t) ---
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user prasanthj commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r194301151 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java --- @@ -397,6 +398,9 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro } hiveStreamingConnection = makeStreamingConnection(options, reader); +// Add shutdown handler with higher priority than FileSystem shutdown hook so that streaming connection gets closed first before +// filesystem close (to avoid ClosedChannelException) + ShutdownHookManager.addShutdownHook(hiveStreamingConnection::close, FileSystem.SHUTDOWN_HOOK_PRIORITY + 1); --- End diff -- You may also want to add uncaught exception handler as I have seen instances where runtime exception or illegal state exception thrown by some other code which if not caught can create broken files. ---
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193413841 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java --- @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.streaming.ConnectionError; +import org.apache.hive.streaming.HiveStreamingConnection; +import org.apache.hive.streaming.InvalidTable; +import org.apache.hive.streaming.SerializationError; +import org.apache.hive.streaming.StreamingConnection; +import org.apache.hive.streaming.StreamingException; +import org.apache.hive.streaming.StreamingIOFailure; +import org.apache.hive.streaming.TransactionError; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.kerberos.KerberosCredentialsService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.pattern.DiscontinuedException; +import org.apache.nifi.processor.util.pattern.RollbackOnFailure; +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.hive.AuthenticationFailedException; +import org.apache.nifi.util.hive.HiveConfigurator; +import org.apache.nifi.util.hive.HiveOptions; +import org.apache.hive.streaming.HiveRecordWriter; +import org.apache.nifi.util.hive.HiveUtils; +import org.apache.nifi.util.hive.ValidationResources; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES; + +@Tags({"hive", "streaming", "put", "database", "store"}) +@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. " ++ "The partition values are expected to be the 'last' fields
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193412766 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hive.common.util.TimestampParser; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class NiFiRecordSerDe extends AbstractSerDe { + +protected RecordReader recordReader; +protected ComponentLog log; +protected List columnNames; +protected StructTypeInfo schema; + +protected StandardStructObjectInspector cachedObjectInspector; +protected TimestampParser tsParser; + +private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)"); + +public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) { +this.recordReader = recordReader; +this.log = log; +} + +@Override +public void initialize(Configuration conf, Properties tbl) { +List columnTypes; +StructTypeInfo rowTypeInfo; + +log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray()); + +// Get column names and types +String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); +String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); +final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl +.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); +// all table column names +if (columnNameProperty.isEmpty()) { +columnNames = new ArrayList<>(0); +} else { +columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter))); +} + +// all column types +if (columnTypeProperty.isEmpty()) { +columnTypes = new ArrayList<>(0); +} else { +columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); +} + +log.debug("columns: {}, {
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193185676 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hive.common.util.TimestampParser; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class NiFiRecordSerDe extends AbstractSerDe { + +protected RecordReader recordReader; +protected ComponentLog log; +protected List columnNames; +protected StructTypeInfo schema; + +protected StandardStructObjectInspector cachedObjectInspector; +protected TimestampParser tsParser; + +private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)"); + +public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) { +this.recordReader = recordReader; +this.log = log; +} + +@Override +public void initialize(Configuration conf, Properties tbl) { +List columnTypes; +StructTypeInfo rowTypeInfo; + +log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray()); + +// Get column names and types +String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); +String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); +final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl +.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); +// all table column names +if (columnNameProperty.isEmpty()) { +columnNames = new ArrayList<>(0); +} else { +columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter))); +} + +// all column types +if (columnTypeProperty.isEmpty()) { +columnTypes = new ArrayList<>(0); +} else { +columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); +} + +log.debug("columns: {}, {
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user prasanthj commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193184727 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java --- @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.streaming.ConnectionError; +import org.apache.hive.streaming.HiveStreamingConnection; +import org.apache.hive.streaming.InvalidTable; +import org.apache.hive.streaming.SerializationError; +import org.apache.hive.streaming.StreamingConnection; +import org.apache.hive.streaming.StreamingException; +import org.apache.hive.streaming.StreamingIOFailure; +import org.apache.hive.streaming.TransactionError; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.kerberos.KerberosCredentialsService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.pattern.DiscontinuedException; +import org.apache.nifi.processor.util.pattern.RollbackOnFailure; +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.hive.AuthenticationFailedException; +import org.apache.nifi.util.hive.HiveConfigurator; +import org.apache.nifi.util.hive.HiveOptions; +import org.apache.hive.streaming.HiveRecordWriter; +import org.apache.nifi.util.hive.HiveUtils; +import org.apache.nifi.util.hive.ValidationResources; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES; + +@Tags({"hive", "streaming", "put", "database", "store"}) +@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. " ++ "The partition values are expected to be the 'last' fields
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user prasanthj commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193184650 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hive.common.util.TimestampParser; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class NiFiRecordSerDe extends AbstractSerDe { + +protected RecordReader recordReader; +protected ComponentLog log; +protected List columnNames; +protected StructTypeInfo schema; + +protected StandardStructObjectInspector cachedObjectInspector; +protected TimestampParser tsParser; + +private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)"); + +public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) { +this.recordReader = recordReader; +this.log = log; +} + +@Override +public void initialize(Configuration conf, Properties tbl) { +List columnTypes; +StructTypeInfo rowTypeInfo; + +log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray()); + +// Get column names and types +String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); +String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); +final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl +.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); +// all table column names +if (columnNameProperty.isEmpty()) { +columnNames = new ArrayList<>(0); +} else { +columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter))); +} + +// all column types +if (columnTypeProperty.isEmpty()) { +columnTypes = new ArrayList<>(0); +} else { +columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); +} + +log.debug("columns: {}, {
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user prasanthj commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193184086 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java --- @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.streaming.ConnectionError; +import org.apache.hive.streaming.HiveStreamingConnection; +import org.apache.hive.streaming.InvalidTable; +import org.apache.hive.streaming.SerializationError; +import org.apache.hive.streaming.StreamingConnection; +import org.apache.hive.streaming.StreamingException; +import org.apache.hive.streaming.StreamingIOFailure; +import org.apache.hive.streaming.TransactionError; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.kerberos.KerberosCredentialsService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.pattern.DiscontinuedException; +import org.apache.nifi.processor.util.pattern.RollbackOnFailure; +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.hive.AuthenticationFailedException; +import org.apache.nifi.util.hive.HiveConfigurator; +import org.apache.nifi.util.hive.HiveOptions; +import org.apache.hive.streaming.HiveRecordWriter; +import org.apache.nifi.util.hive.HiveUtils; +import org.apache.nifi.util.hive.ValidationResources; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES; + +@Tags({"hive", "streaming", "put", "database", "store"}) +@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. " ++ "The partition values are expected to be the 'last' fields
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193183490 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hive.common.util.TimestampParser; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class NiFiRecordSerDe extends AbstractSerDe { + +protected RecordReader recordReader; +protected ComponentLog log; +protected List columnNames; +protected StructTypeInfo schema; + +protected StandardStructObjectInspector cachedObjectInspector; +protected TimestampParser tsParser; + +private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)"); + +public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) { +this.recordReader = recordReader; +this.log = log; +} + +@Override +public void initialize(Configuration conf, Properties tbl) { +List columnTypes; +StructTypeInfo rowTypeInfo; + +log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray()); + +// Get column names and types +String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); +String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); +final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl +.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); +// all table column names +if (columnNameProperty.isEmpty()) { +columnNames = new ArrayList<>(0); +} else { +columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter))); +} + +// all column types +if (columnTypeProperty.isEmpty()) { +columnTypes = new ArrayList<>(0); +} else { +columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); +} + +log.debug("columns: {}, {
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user prasanthj commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193182827 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hive.common.util.TimestampParser; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class NiFiRecordSerDe extends AbstractSerDe { + +protected RecordReader recordReader; +protected ComponentLog log; +protected List columnNames; +protected StructTypeInfo schema; + +protected StandardStructObjectInspector cachedObjectInspector; +protected TimestampParser tsParser; + +private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)"); + +public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) { +this.recordReader = recordReader; +this.log = log; +} + +@Override +public void initialize(Configuration conf, Properties tbl) { +List columnTypes; +StructTypeInfo rowTypeInfo; + +log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray()); + +// Get column names and types +String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); +String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); +final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl +.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); +// all table column names +if (columnNameProperty.isEmpty()) { +columnNames = new ArrayList<>(0); +} else { +columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter))); +} + +// all column types +if (columnTypeProperty.isEmpty()) { +columnTypes = new ArrayList<>(0); +} else { +columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); +} + +log.debug("columns: {}, {
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193182526 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.orc.record; + +import org.apache.avro.Schema; +import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils; +import org.apache.hadoop.hive.ql.io.orc.Writer; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSet; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.orc.PutORC.HIVE_DDL_ATTRIBUTE; + +/** + * HDFSRecordWriter that writes ORC files using Avro as the schema representation. + */ + +public class ORCHDFSRecordWriter implements HDFSRecordWriter { + +private final Schema avroSchema; +private final TypeInfo orcSchema; +private final Writer orcWriter; +private final String hiveTableName; +private final boolean hiveFieldNames; + +public ORCHDFSRecordWriter(final Writer orcWriter, final Schema avroSchema, final String hiveTableName, final boolean hiveFieldNames) { +this.avroSchema = avroSchema; +this.orcWriter = orcWriter; +this.hiveFieldNames = hiveFieldNames; +this.orcSchema = NiFiOrcUtils.getOrcField(avroSchema, this.hiveFieldNames); +this.hiveTableName = hiveTableName; +} + +@Override +public void write(final Record record) throws IOException { +List fields = avroSchema.getFields(); --- End diff -- Yes will change ---
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193182416 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java --- @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.streaming.ConnectionError; +import org.apache.hive.streaming.HiveStreamingConnection; +import org.apache.hive.streaming.InvalidTable; +import org.apache.hive.streaming.SerializationError; +import org.apache.hive.streaming.StreamingConnection; +import org.apache.hive.streaming.StreamingException; +import org.apache.hive.streaming.StreamingIOFailure; +import org.apache.hive.streaming.TransactionError; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.kerberos.KerberosCredentialsService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.pattern.DiscontinuedException; +import org.apache.nifi.processor.util.pattern.RollbackOnFailure; +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.hive.AuthenticationFailedException; +import org.apache.nifi.util.hive.HiveConfigurator; +import org.apache.nifi.util.hive.HiveOptions; +import org.apache.hive.streaming.HiveRecordWriter; +import org.apache.nifi.util.hive.HiveUtils; +import org.apache.nifi.util.hive.ValidationResources; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES; + +@Tags({"hive", "streaming", "put", "database", "store"}) +@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. " ++ "The partition values are expected to be the 'last' fields
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193181866 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java --- @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.streaming.ConnectionError; +import org.apache.hive.streaming.HiveStreamingConnection; +import org.apache.hive.streaming.InvalidTable; +import org.apache.hive.streaming.SerializationError; +import org.apache.hive.streaming.StreamingConnection; +import org.apache.hive.streaming.StreamingException; +import org.apache.hive.streaming.StreamingIOFailure; +import org.apache.hive.streaming.TransactionError; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.kerberos.KerberosCredentialsService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.pattern.DiscontinuedException; +import org.apache.nifi.processor.util.pattern.RollbackOnFailure; +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.hive.AuthenticationFailedException; +import org.apache.nifi.util.hive.HiveConfigurator; +import org.apache.nifi.util.hive.HiveOptions; +import org.apache.hive.streaming.HiveRecordWriter; +import org.apache.nifi.util.hive.HiveUtils; +import org.apache.nifi.util.hive.ValidationResources; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES; + +@Tags({"hive", "streaming", "put", "database", "store"}) +@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. " ++ "The partition values are expected to be the 'last' fields
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193181171 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hive.common.util.TimestampParser; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class NiFiRecordSerDe extends AbstractSerDe { + +protected RecordReader recordReader; +protected ComponentLog log; +protected List columnNames; +protected StructTypeInfo schema; + +protected StandardStructObjectInspector cachedObjectInspector; +protected TimestampParser tsParser; + +private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)"); + +public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) { +this.recordReader = recordReader; +this.log = log; +} + +@Override +public void initialize(Configuration conf, Properties tbl) { +List columnTypes; +StructTypeInfo rowTypeInfo; + +log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray()); + +// Get column names and types +String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); +String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); +final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl +.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); +// all table column names +if (columnNameProperty.isEmpty()) { +columnNames = new ArrayList<>(0); +} else { +columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter))); +} + +// all column types +if (columnTypeProperty.isEmpty()) { +columnTypes = new ArrayList<>(0); +} else { +columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); +} + +log.debug("columns: {}, {
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user prasanthj commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193180621 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hive.common.util.TimestampParser; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class NiFiRecordSerDe extends AbstractSerDe { + +protected RecordReader recordReader; +protected ComponentLog log; +protected List columnNames; +protected StructTypeInfo schema; + +protected StandardStructObjectInspector cachedObjectInspector; +protected TimestampParser tsParser; + +private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)"); + +public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) { +this.recordReader = recordReader; +this.log = log; +} + +@Override +public void initialize(Configuration conf, Properties tbl) { +List columnTypes; +StructTypeInfo rowTypeInfo; + +log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray()); + +// Get column names and types +String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); +String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); +final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl +.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); +// all table column names +if (columnNameProperty.isEmpty()) { +columnNames = new ArrayList<>(0); +} else { +columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter))); +} + +// all column types +if (columnTypeProperty.isEmpty()) { +columnTypes = new ArrayList<>(0); +} else { +columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); +} + +log.debug("columns: {}, {
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193179938 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hive.common.util.TimestampParser; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class NiFiRecordSerDe extends AbstractSerDe { + +protected RecordReader recordReader; +protected ComponentLog log; +protected List columnNames; +protected StructTypeInfo schema; + +protected StandardStructObjectInspector cachedObjectInspector; +protected TimestampParser tsParser; + +private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)"); + +public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) { +this.recordReader = recordReader; +this.log = log; +} + +@Override +public void initialize(Configuration conf, Properties tbl) { +List columnTypes; +StructTypeInfo rowTypeInfo; + +log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray()); + +// Get column names and types +String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); +String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); +final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl +.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); +// all table column names +if (columnNameProperty.isEmpty()) { +columnNames = new ArrayList<>(0); +} else { +columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter))); +} + +// all column types +if (columnTypeProperty.isEmpty()) { +columnTypes = new ArrayList<>(0); +} else { +columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); +} + +log.debug("columns: {}, {
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user prasanthj commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193177877 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java --- @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.streaming.ConnectionError; +import org.apache.hive.streaming.HiveStreamingConnection; +import org.apache.hive.streaming.InvalidTable; +import org.apache.hive.streaming.SerializationError; +import org.apache.hive.streaming.StreamingConnection; +import org.apache.hive.streaming.StreamingException; +import org.apache.hive.streaming.StreamingIOFailure; +import org.apache.hive.streaming.TransactionError; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.kerberos.KerberosCredentialsService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.pattern.DiscontinuedException; +import org.apache.nifi.processor.util.pattern.RollbackOnFailure; +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.hive.AuthenticationFailedException; +import org.apache.nifi.util.hive.HiveConfigurator; +import org.apache.nifi.util.hive.HiveOptions; +import org.apache.hive.streaming.HiveRecordWriter; +import org.apache.nifi.util.hive.HiveUtils; +import org.apache.nifi.util.hive.ValidationResources; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES; + +@Tags({"hive", "streaming", "put", "database", "store"}) +@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. " ++ "The partition values are expected to be the 'last' fields
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user prasanthj commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193178966 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.orc.record; + +import org.apache.avro.Schema; +import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils; +import org.apache.hadoop.hive.ql.io.orc.Writer; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSet; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.orc.PutORC.HIVE_DDL_ATTRIBUTE; + +/** + * HDFSRecordWriter that writes ORC files using Avro as the schema representation. + */ + +public class ORCHDFSRecordWriter implements HDFSRecordWriter { + +private final Schema avroSchema; +private final TypeInfo orcSchema; +private final Writer orcWriter; +private final String hiveTableName; +private final boolean hiveFieldNames; + +public ORCHDFSRecordWriter(final Writer orcWriter, final Schema avroSchema, final String hiveTableName, final boolean hiveFieldNames) { +this.avroSchema = avroSchema; +this.orcWriter = orcWriter; +this.hiveFieldNames = hiveFieldNames; +this.orcSchema = NiFiOrcUtils.getOrcField(avroSchema, this.hiveFieldNames); +this.hiveTableName = hiveTableName; +} + +@Override +public void write(final Record record) throws IOException { +List fields = avroSchema.getFields(); +if (fields != null) { +Object[] row = new Object[fields.size()]; --- End diff -- Same for this array. If the fields does not change, can be a single allocation. ---
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user prasanthj commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193178839 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.orc.record; + +import org.apache.avro.Schema; +import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils; +import org.apache.hadoop.hive.ql.io.orc.Writer; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSet; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.orc.PutORC.HIVE_DDL_ATTRIBUTE; + +/** + * HDFSRecordWriter that writes ORC files using Avro as the schema representation. + */ + +public class ORCHDFSRecordWriter implements HDFSRecordWriter { + +private final Schema avroSchema; +private final TypeInfo orcSchema; +private final Writer orcWriter; +private final String hiveTableName; +private final boolean hiveFieldNames; + +public ORCHDFSRecordWriter(final Writer orcWriter, final Schema avroSchema, final String hiveTableName, final boolean hiveFieldNames) { +this.avroSchema = avroSchema; +this.orcWriter = orcWriter; +this.hiveFieldNames = hiveFieldNames; +this.orcSchema = NiFiOrcUtils.getOrcField(avroSchema, this.hiveFieldNames); +this.hiveTableName = hiveTableName; +} + +@Override +public void write(final Record record) throws IOException { +List fields = avroSchema.getFields(); --- End diff -- If fields does not change this can be outside of inner loop? ---
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user prasanthj commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193173275 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hive.common.util.TimestampParser; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class NiFiRecordSerDe extends AbstractSerDe { + +protected RecordReader recordReader; +protected ComponentLog log; +protected List columnNames; +protected StructTypeInfo schema; + +protected StandardStructObjectInspector cachedObjectInspector; +protected TimestampParser tsParser; + +private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)"); + +public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) { +this.recordReader = recordReader; +this.log = log; +} + +@Override +public void initialize(Configuration conf, Properties tbl) { +List columnTypes; +StructTypeInfo rowTypeInfo; + +log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray()); + +// Get column names and types +String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); +String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); +final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl +.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); +// all table column names +if (columnNameProperty.isEmpty()) { +columnNames = new ArrayList<>(0); +} else { +columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter))); +} + +// all column types +if (columnTypeProperty.isEmpty()) { +columnTypes = new ArrayList<>(0); +} else { +columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); +} + +log.debug("columns: {}, {
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user prasanthj commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193174462 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hive.common.util.TimestampParser; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class NiFiRecordSerDe extends AbstractSerDe { + +protected RecordReader recordReader; +protected ComponentLog log; +protected List columnNames; +protected StructTypeInfo schema; + +protected StandardStructObjectInspector cachedObjectInspector; +protected TimestampParser tsParser; + +private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)"); + +public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) { +this.recordReader = recordReader; +this.log = log; +} + +@Override +public void initialize(Configuration conf, Properties tbl) { +List columnTypes; +StructTypeInfo rowTypeInfo; + +log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray()); + +// Get column names and types +String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); +String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); +final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl +.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); +// all table column names +if (columnNameProperty.isEmpty()) { +columnNames = new ArrayList<>(0); +} else { +columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter))); +} + +// all column types +if (columnTypeProperty.isEmpty()) { +columnTypes = new ArrayList<>(0); +} else { +columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); +} + +log.debug("columns: {}, {
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user prasanthj commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193176667 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java --- @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.streaming.ConnectionError; +import org.apache.hive.streaming.HiveStreamingConnection; +import org.apache.hive.streaming.InvalidTable; +import org.apache.hive.streaming.SerializationError; +import org.apache.hive.streaming.StreamingConnection; +import org.apache.hive.streaming.StreamingException; +import org.apache.hive.streaming.StreamingIOFailure; +import org.apache.hive.streaming.TransactionError; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.kerberos.KerberosCredentialsService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.pattern.DiscontinuedException; +import org.apache.nifi.processor.util.pattern.RollbackOnFailure; +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.hive.AuthenticationFailedException; +import org.apache.nifi.util.hive.HiveConfigurator; +import org.apache.nifi.util.hive.HiveOptions; +import org.apache.hive.streaming.HiveRecordWriter; +import org.apache.nifi.util.hive.HiveUtils; +import org.apache.nifi.util.hive.ValidationResources; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES; + +@Tags({"hive", "streaming", "put", "database", "store"}) +@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. " ++ "The partition values are expected to be the 'last' fields
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r192763045 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming; + +import com.google.common.base.Joiner; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Properties; + +public class HiveRecordWriter extends AbstractRecordWriter { --- End diff -- @prasanthj Do you mind taking a look at HiveRecordWriter and NiFiRecordSerDe (and PutHive3Streaming which uses them when creating the connection and passing in options)? Those are the custom impls for the new Hive Streaming API classes, hoping for suggestions on improving performance, etc. Thanks in advance! ---
[GitHub] nifi pull request #2755: NIFI-4963: Added Hive3 bundle
GitHub user mattyb149 opened a pull request: https://github.com/apache/nifi/pull/2755 NIFI-4963: Added Hive3 bundle You'll need to activate the include-hive3 profile when building the assembly, it is currently being excluded by default due to its size (~200 MB). ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [x] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [x] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [x] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mattyb149/nifi NIFI-4963 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2755.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2755 commit 417bc821d277a0556842f5aa734d854ca225147b Author: Matthew Burgess Date: 2018-06-04T14:29:08Z NIFI-4963: Added Hive3 bundle ---