You may want to look here: https://community.hortonworks.com/content/kbentry/4321/hive-acid-current-state.html
Flume 1.5.2 doesn't include Hive support AFAIK so whatever they built for Hortonworks is their own build. Note on the sink docs it says, "This sink is provided as a preview feature and not recommended for use in production." ( https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.0/ds_flume/FlumeUserGuide.html) It appears they are using hive version 1.2.1. Not sure what version the Sink lines up to. Looking here: http://hortonworks.com/blog/adding-acid-to-apache-hive/ I appears that Hive go support in 0.14.0 for ACID inserts ( https://issues.apache.org/jira/browse/HIVE-5317) but has a but https://issues.apache.org/jira/browse/HIVE-12307?jql=project%20%3D%20HIVE%20AND%20text%20~%20%22TransactionBatch%20closed%22 about transactions closing that fixes in hive 1.3.0. On Tue, Jul 5, 2016 at 1:01 AM, Thanh Hong Dai <[email protected]> wrote: > I forgot to include the version information. I’m currently using Flume > 1.5.2 from HDP 2.4.2. > > > > Looking at the changelog of Flume 1.6.0, the latest version, there seems > to be some improvements for Hive support. > > This makes me wondering - does Flume 1.5.2 support Hive streaming to ACID > table? > > > > Best regards, > > Thanh Hong. > > > > *From:* Thanh Hong Dai [mailto:[email protected]] > *Sent:* Tuesday, 5 July, 2016 11:47 AM > *To:* [email protected] > *Subject:* Unable to deliver event. > org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: > TransactionBatch has been closed() > > > > Does anyone knows the cause of this exception when using Hive Sink, and > how to fix it? > > > > The Hive Sink managed to write data in the Hive table for a few minutes > (which I can confirm by querying the table), but then it shows the > Exception below in the log file (/var/log/flume/flume-<streamname>.log) for > all the nodes. > > > > 05 Jul 2016 04:24:22,737 ERROR > [SinkRunner-PollingRunner-DefaultSinkProcessor] > (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver > event. Exception follows. > > org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: > TransactionBatch TxnIds=[29489...30488] on endPoint = > {metaStoreUri='thrift://hive.metastore:9083', database='default', > table='acid', partitionVals=[0804] } has been closed() > > at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375) > > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > > at > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.IllegalStateException: TransactionBatch > TxnIds=[29489...30488] on endPoint = > {metaStoreUri='thrift://hive.metastore:9083', database='default', > table='acid', partitionVals=[0804] } has been closed() > > at > org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:690) > > at > org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:729) > > at > org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:686) > > at > org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTextSerializer.java:48) > > at > org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161) > > at > org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155) > > at > org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > > ... 1 more > > 05 Jul 2016 04:24:27,891 ERROR > [SinkRunner-PollingRunner-DefaultSinkProcessor] > (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver > event. Exception follows. > > org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: > TransactionBatch TxnIds=[29489...30488] on endPoint = > {metaStoreUri='thrift://hive.metastore:9083', database='default', > table='acid', partitionVals=[0804] } has been closed() > > at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375) > > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > > at > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.IllegalStateException: TransactionBatch > TxnIds=[29489...30488] on endPoint = > {metaStoreUri='thrift://hive.metastore:9083', database='default', > table='acid', partitionVals=[0804] } has been closed() > > at > org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:690) > > at > org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:729) > > at > org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:686) > > at > org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTextSerializer.java:48) > > at > org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161) > > at > org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155) > > at > org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > > ... 1 more > > > > My flume.conf file: > > > > # acidstream - streaming data from Kafka into Hive transactional table > > acidstream.sources = kafka-source > > acidstream.sinks = hive-sink > > acidstream.channels = gutter > > > > acidstream.sources.kafka-source.channels = gutter > > acidstream.sources.kafka-source.type = > org.apache.flume.source.kafka.KafkaSource > > acidstream.sources.kafka-source.zookeeperConnect = > chdhost125.vitaldev.tma.com.vn:2181,chdhost27.vitaldev.tma.com.vn:2181, > chdhost185.vitaldev.tma.com.vn:2181 > > acidstream.sources.kafka-source.topic = lan > > acidstream.sources.kafka-source.groupId = acid > > acidstream.sources.kafka-source.batchSize = 10000 > > acidstream.sources.kafka-source.batchDurationMillis = 60000 > > acidstream.sources.kafka-source.kafka.consumer.timeout.ms = 200 > > > > acidstream.sources.kafka-source.interceptors = i1 > > acidstream.sources.kafka-source.interceptors.i1.type = regex_extractor > > acidstream.sources.kafka-source.interceptors.i1.regex = ^( > \\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2}) > > acidstream.sources.kafka-source.interceptors.i1.serializers = s1 > > acidstream.sources.kafka-source.interceptors.i1.serializers.s1.type = > org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer > > acidstream.sources.kafka-source.interceptors.i1.serializers.s1.name = > timestamp > > acidstream.sources.kafka-source.interceptors.i1.serializers.s1.pattern = > yyyy-MM-dd HH:mm:ss > > > > acidstream.sinks.hive-sink.channel = gutter > > acidstream.sinks.hive-sink.type = hive > > acidstream.sinks.hive-sink.hive.metastore = thrift://hive.metastore:9083 > > acidstream.sinks.hive-sink.hive.database = default > > acidstream.sinks.hive-sink.hive.table = acid > > acidstream.sinks.hive-sink.hive.partition = %m%d > > acidstream.sinks.hive-sink.heartBeatInterval = 10 > > acidstream.sinks.hive-sink.useLocalTimeStamp = false > > acidstream.sinks.hive-sink.round = false > > acidstream.sinks.hive-sink.hive.txnsPerBatchAsk = 1000 > > acidstream.sinks.hive-sink.batchSize = 10000 > > acidstream.sinks.hive-sink.callTimeout = 30000 > > acidstream.sinks.hive-sink.serializer = DELIMITED > > acidstream.sinks.hive-sink.serializer.delimiter = "\t" > > acidstream.sinks.hive-sink.serializer.serdeSeparator = '\t' > > acidstream.sinks.hive-sink.serializer.fieldnames = timestamp,id,data > > > > acidstream.channels.gutter.type = memory > > acidstream.channels.gutter.capacity = 100000 > > acidstream.channels.gutter.transactionCapacity = 50000 > > > > My flume-env file has this line added: > > > > export JAVA_OPTS="-Xms100m -Xmx3g" > > > > My table on Hive has the following properties: > > > > PARTITIONED BY (md string) > > CLUSTERED BY (id) INTO 10 BUCKETS > > STORED AS ORC > > TBLPROPERTIES ('transactional' = 'true'); > > > > Hive has Tez engine set as the default execution engine. > > > > Could this error be caused by low number of threads? (NameNode has 100 > server threads available) > > > > Best regards, > > Thanh Hong. > > >
