Thanks for your reply. I guess I will have to wait for HDP to include the new 
version of Hive.

 

Best regards,

Thanh Hong.

 

From: Joe Lawson [mailto:[email protected]] 
Sent: Wednesday, 6 July, 2016 2:41 AM
To: [email protected]
Subject: Re: Unable to deliver event. org.apache.flume.EventDeliveryException: 
java.lang.IllegalStateException: TransactionBatch has been closed()

 

You may want to look here: 
https://community.hortonworks.com/content/kbentry/4321/hive-acid-current-state.html

 

Flume 1.5.2 doesn't include Hive support AFAIK so whatever they built for 
Hortonworks is their own build. Note on the sink docs it says, "This sink is 
provided as a preview feature and not recommended for use in production." 
(https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.0/ds_flume/FlumeUserGuide.html)
  It appears they are using hive version 1.2.1. Not sure what version the Sink 
lines up to. 

 

Looking here: http://hortonworks.com/blog/adding-acid-to-apache-hive/

 

I appears that Hive go support in 0.14.0 for ACID inserts 
(https://issues.apache.org/jira/browse/HIVE-5317) but has a but 
https://issues.apache.org/jira/browse/HIVE-12307?jql=project%20%3D%20HIVE%20AND%20text%20~%20%22TransactionBatch%20closed%22
 about transactions closing that fixes in hive 1.3.0.

 

On Tue, Jul 5, 2016 at 1:01 AM, Thanh Hong Dai <[email protected] 
<mailto:[email protected]> > wrote:

I forgot to include the version information. I’m currently using Flume 1.5.2 
from HDP 2.4.2.

 

Looking at the changelog of Flume 1.6.0, the latest version, there seems to be 
some improvements for Hive support.

This makes me wondering - does Flume 1.5.2 support Hive streaming to ACID table?

 

Best regards,

Thanh Hong.

 

From: Thanh Hong Dai [mailto:[email protected] <mailto:[email protected]> ] 
Sent: Tuesday, 5 July, 2016 11:47 AM
To: [email protected] <mailto:[email protected]> 
Subject: Unable to deliver event. org.apache.flume.EventDeliveryException: 
java.lang.IllegalStateException: TransactionBatch has been closed()

 

Does anyone knows the cause of this exception when using Hive Sink, and how to 
fix it?

 

The Hive Sink managed to write data in the Hive table for a few minutes (which 
I can confirm by querying the table), but then it shows the Exception below in 
the log file (/var/log/flume/flume-<streamname>.log) for all the nodes.

 

05 Jul 2016 04:24:22,737 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] 
(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. 
Exception follows.

org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: 
TransactionBatch TxnIds=[29489...30488] on endPoint = 
{metaStoreUri='thrift://hive.metastore:9083', database='default', table='acid', 
partitionVals=[0804] } has been closed()

        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)

        at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalStateException: TransactionBatch 
TxnIds=[29489...30488] on endPoint = 
{metaStoreUri='thrift://hive.metastore:9083', database='default', table='acid', 
partitionVals=[0804] } has been closed()

        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:690)

        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:729)

        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:686)

        at 
org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTextSerializer.java:48)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)

        at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        ... 1 more

05 Jul 2016 04:24:27,891 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] 
(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. 
Exception follows.

org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: 
TransactionBatch TxnIds=[29489...30488] on endPoint = 
{metaStoreUri='thrift://hive.metastore:9083', database='default', table='acid', 
partitionVals=[0804] } has been closed()

        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)

        at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalStateException: TransactionBatch 
TxnIds=[29489...30488] on endPoint = 
{metaStoreUri='thrift://hive.metastore:9083', database='default', table='acid', 
partitionVals=[0804] } has been closed()

        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:690)

        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:729)

        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:686)

        at 
org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTextSerializer.java:48)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)

        at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        ... 1 more

 

My flume.conf file:

 

# acidstream - streaming data from Kafka into Hive transactional table

acidstream.sources = kafka-source

acidstream.sinks = hive-sink

acidstream.channels = gutter

 

acidstream.sources.kafka-source.channels = gutter

acidstream.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource

acidstream.sources.kafka-source.zookeeperConnect = 
chdhost125.vitaldev.tma.com.vn:2181 
<http://chdhost125.vitaldev.tma.com.vn:2181> 
,chdhost27.vitaldev.tma.com.vn:2181 <http://chdhost27.vitaldev.tma.com.vn:2181> 
,chdhost185.vitaldev.tma.com.vn:2181 
<http://chdhost185.vitaldev.tma.com.vn:2181> 

acidstream.sources.kafka-source.topic = lan

acidstream.sources.kafka-source.groupId = acid

acidstream.sources.kafka-source.batchSize = 10000

acidstream.sources.kafka-source.batchDurationMillis = 60000

acidstream.sources.kafka-source.kafka.consumer.timeout.ms 
<http://acidstream.sources.kafka-source.kafka.consumer.timeout.ms>  = 200

 

acidstream.sources.kafka-source.interceptors = i1

acidstream.sources.kafka-source.interceptors.i1.type = regex_extractor

acidstream.sources.kafka-source.interceptors.i1.regex = 
^(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2} 
<file:///\\d%7b4%7d-\d%7b2%7d-\d%7b2%7d\s\d%7b2%7d:\d%7b2%7d:\d%7b2%7d> )

acidstream.sources.kafka-source.interceptors.i1.serializers = s1

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.type = 
org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.name 
<http://acidstream.sources.kafka-source.interceptors.i1.serializers.s1.name>  = 
timestamp

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.pattern = 
yyyy-MM-dd HH:mm:ss

 

acidstream.sinks.hive-sink.channel = gutter

acidstream.sinks.hive-sink.type = hive

acidstream.sinks.hive-sink.hive.metastore = thrift://hive.metastore:9083

acidstream.sinks.hive-sink.hive.database = default

acidstream.sinks.hive-sink.hive.table = acid

acidstream.sinks.hive-sink.hive.partition = %m%d

acidstream.sinks.hive-sink.heartBeatInterval = 10

acidstream.sinks.hive-sink.useLocalTimeStamp = false

acidstream.sinks.hive-sink.round = false

acidstream.sinks.hive-sink.hive.txnsPerBatchAsk = 1000

acidstream.sinks.hive-sink.batchSize = 10000

acidstream.sinks.hive-sink.callTimeout = 30000

acidstream.sinks.hive-sink.serializer = DELIMITED

acidstream.sinks.hive-sink.serializer.delimiter = "\t"

acidstream.sinks.hive-sink.serializer.serdeSeparator = '\t'

acidstream.sinks.hive-sink.serializer.fieldnames = timestamp,id,data

 

acidstream.channels.gutter.type = memory

acidstream.channels.gutter.capacity = 100000

acidstream.channels.gutter.transactionCapacity = 50000

 

My flume-env file has this line added:

 

export JAVA_OPTS="-Xms100m -Xmx3g"

 

My table on Hive has the following properties:

 

PARTITIONED BY (md string)

CLUSTERED BY (id) INTO 10 BUCKETS

STORED AS ORC

TBLPROPERTIES ('transactional' = 'true');

 

Hive has Tez engine set as the default execution engine.

 

Could this error be caused by low number of threads? (NameNode has 100 server 
threads available)

 

Best regards,

Thanh Hong.

 

 

Reply via email to