Brock.. I assume you mean "abort" when you say "destroy".. if so yes I too think that closing should abort an uncommitted transaction. I had the same thought when reading through the memory channel implementation.
-roshan On Thu, Nov 15, 2012 at 4:50 AM, Brock Noland <[email protected]> wrote: > Can you log the Throwable as the first thing in the catch block to see > if something and what it is, is being thrown? > > Transactions are thread local so if for some reason the the sequencing > gets messed up on an earlier call the process, every call to > transaction will thrown an exception including begin. > > > https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java > > As I stated in FLUME-1089 I think that when close is called it should > forcefully destroy the transaction like JDBC close() but I have not > got much agreement. > > > On Thu, Nov 15, 2012 at 5:24 AM, Andrew Jones <[email protected]> > wrote: > > We are using Flume 1.2.0. We have a custom source, although it passes > > through an Avro Sink and Source before getting to the sink. We are now > using > > the memory channel, although had just switched from the JDBC channel > when we > > started seeing these errors, so maybe that's something to do with it? > > > > I tried wrapping transaction.rollback(); in a try catch and logging in > the > > catch, but it wasn't called, so I don't think the rollback is throwing an > > error. > > > > I think it may have something to do with switching channels, as right > after > > Flume reloaded the config we started getting errors. I have restarted the > > flume node manually and we are still getting the error. > > > > Thanks, > > Andrew > > > > > > On 14 November 2012 20:02, Hari Shreedharan <[email protected]> > > wrote: > >> > >> Which version of Flume are you using? It looks like the transaction was > >> never rolled back or committed. It is likely that the rollback method > too > >> threw some exception, and the rollback was not successful. Also, what > >> channel are you using? > >> > >> > >> Thanks, > >> Hari > >> > >> -- > >> Hari Shreedharan > >> > >> On Wednesday, November 14, 2012 at 8:55 AM, Andrew Jones wrote: > >> > >> Hi, > >> > >> I have a custom sink which has been working fine, but recently I have > >> started seeing this error in the logs: > >> > >> Unable to deliver event. Exception follows. > >> java.lang.IllegalStateException: close() called when transaction is > OPEN - > >> you must either commit or rollback first > >> at > >> com.google.common.base.Preconditions.checkState(Preconditions.java:176) > >> ... > >> > >> > >> After having a google and finding > >> https://issues.apache.org/jira/browse/FLUME-1089, I have double > checked I am > >> using the correct try, catch, finally idiom that other sinks use, and I > seem > >> to be doing the same. I do the following: > >> > >> public Status process() throws EventDeliveryException { > >> Status status = Status.READY; > >> > >> Channel channel = getChannel(); > >> Transaction transaction = channel.getTransaction(); > >> > >> try { > >> transaction.begin(); > >> > >> // does a bit of processing and > >> // writes out the event to MongoDB > >> > >> transaction.commit(); > >> > >> } catch (Throwable t) { > >> transaction.rollback(); > >> > >> if (t instanceof Error) { > >> throw (Error) t; > >> } else if (t instanceof EventDeliveryException) { > >> throw (EventDeliveryException) t; > >> } else if (t instanceof ChannelException) { > >> logger.error("Brodie Log Sink " + getName() + ": Unable to get event > from" > >> + > >> " channel " + channel.getName() + ". Exception follows.", t); > >> status = Status.BACKOFF; > >> } else { > >> throw new EventDeliveryException("Failed to send events", t); > >> } > >> } finally { > >> transaction.close(); > >> } > >> > >> return status; > >> } > >> > >> } > >> > >> All of this code came from looking at other sinks (Avro and HDFS), so I > am > >> pretty sure its correct. > >> > >> Can anyone see anything that might be a problem, or is there anything > else > >> I can do to avoid this error? > >> > >> Thanks, > >> Andrew > >> > >> > > > > > > -- > Apache MRUnit - Unit testing MapReduce - > http://incubator.apache.org/mrunit/ >
