Would you be able to send the source of your sink via pastbin in addition to your config?
On Fri, Nov 16, 2012 at 9:21 AM, Andrew Jones <[email protected]> wrote: > I tried logging the first throwable, but now that is just the > IllegalStateException. > > Today I have been looking at Flume-1.3.0rc3 and I have noticed the same > problem. This is using the Avro source, File channel and our custom sink. > After Flume reloads its config, the first error message comes when the Avro > source starts up: > > 16 Nov 2012 16:04:25,237 INFO [lifecycleSupervisor-1-4] > (org.apache.flume.source.AvroSource.start:142) - Starting Avro source > source: { bindAddress: 0.0.0.0, port: 36060 }... > 16 Nov 2012 16:04:25,484 ERROR > [SinkRunner-PollingRunner-DefaultSinkProcessor] > (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver > event. Exception follows. > java.lang.IllegalStateException: close() called when transaction is OPEN - > you must either commit or rollback first > at > com.google.common.base.Preconditions.checkState(Preconditions.java:176) > at > org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179) > at > com.arm.pd.brodie.flume.sink.ResultSink.processSingle(ResultSink.java:440) > at > com.arm.pd.brodie.flume.sink.ResultSink.process(ResultSink.java:172) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > at > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > at java.lang.Thread.run(Thread.java:636) > 16 Nov 2012 16:04:25,594 INFO [lifecycleSupervisor-1-4] > (org.apache.flume.instrumentation.MonitoredCounterGroup.register:89) - > Monitoried counter group for type: SOURCE, name: source, registered > successfully. > 16 Nov 2012 16:04:25,595 INFO [lifecycleSupervisor-1-4] > (org.apache.flume.instrumentation.MonitoredCounterGroup.start:73) - > Component type: SOURCE, name: source started > 16 Nov 2012 16:04:25,595 INFO [lifecycleSupervisor-1-4] > (org.apache.flume.source.AvroSource.start:168) - Avro source source > started. > > I then continually get errors from the Sink, presumably as its been called > periodically to check for events in the channel. So is it possible its the > Avro source causing the issue? > > There should have been nothing persisted in the file channel when > restarting. > > When the transaction gets messed up like this, is there a way to refresh it, > preferably without losing any data? > > I am still able to send things to flume and they get processed and inserted > by my sink, so it still seems to work OK. > > Thanks, > Andrew > > > > On 15 November 2012 12:50, Brock Noland <[email protected]> wrote: >> >> Can you log the Throwable as the first thing in the catch block to see >> if something and what it is, is being thrown? >> >> Transactions are thread local so if for some reason the the sequencing >> gets messed up on an earlier call the process, every call to >> transaction will thrown an exception including begin. >> >> >> https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java >> >> As I stated in FLUME-1089 I think that when close is called it should >> forcefully destroy the transaction like JDBC close() but I have not >> got much agreement. >> >> >> On Thu, Nov 15, 2012 at 5:24 AM, Andrew Jones <[email protected]> >> wrote: >> > We are using Flume 1.2.0. We have a custom source, although it passes >> > through an Avro Sink and Source before getting to the sink. We are now >> > using >> > the memory channel, although had just switched from the JDBC channel >> > when we >> > started seeing these errors, so maybe that's something to do with it? >> > >> > I tried wrapping transaction.rollback(); in a try catch and logging in >> > the >> > catch, but it wasn't called, so I don't think the rollback is throwing >> > an >> > error. >> > >> > I think it may have something to do with switching channels, as right >> > after >> > Flume reloaded the config we started getting errors. I have restarted >> > the >> > flume node manually and we are still getting the error. >> > >> > Thanks, >> > Andrew >> > >> > >> > On 14 November 2012 20:02, Hari Shreedharan <[email protected]> >> > wrote: >> >> >> >> Which version of Flume are you using? It looks like the transaction was >> >> never rolled back or committed. It is likely that the rollback method >> >> too >> >> threw some exception, and the rollback was not successful. Also, what >> >> channel are you using? >> >> >> >> >> >> Thanks, >> >> Hari >> >> >> >> -- >> >> Hari Shreedharan >> >> >> >> On Wednesday, November 14, 2012 at 8:55 AM, Andrew Jones wrote: >> >> >> >> Hi, >> >> >> >> I have a custom sink which has been working fine, but recently I have >> >> started seeing this error in the logs: >> >> >> >> Unable to deliver event. Exception follows. >> >> java.lang.IllegalStateException: close() called when transaction is >> >> OPEN - >> >> you must either commit or rollback first >> >> at >> >> com.google.common.base.Preconditions.checkState(Preconditions.java:176) >> >> ... >> >> >> >> >> >> After having a google and finding >> >> https://issues.apache.org/jira/browse/FLUME-1089, I have double checked >> >> I am >> >> using the correct try, catch, finally idiom that other sinks use, and I >> >> seem >> >> to be doing the same. I do the following: >> >> >> >> public Status process() throws EventDeliveryException { >> >> Status status = Status.READY; >> >> >> >> Channel channel = getChannel(); >> >> Transaction transaction = channel.getTransaction(); >> >> >> >> try { >> >> transaction.begin(); >> >> >> >> // does a bit of processing and >> >> // writes out the event to MongoDB >> >> >> >> transaction.commit(); >> >> >> >> } catch (Throwable t) { >> >> transaction.rollback(); >> >> >> >> if (t instanceof Error) { >> >> throw (Error) t; >> >> } else if (t instanceof EventDeliveryException) { >> >> throw (EventDeliveryException) t; >> >> } else if (t instanceof ChannelException) { >> >> logger.error("Brodie Log Sink " + getName() + ": Unable to get event >> >> from" >> >> + >> >> " channel " + channel.getName() + ". Exception follows.", t); >> >> status = Status.BACKOFF; >> >> } else { >> >> throw new EventDeliveryException("Failed to send events", t); >> >> } >> >> } finally { >> >> transaction.close(); >> >> } >> >> >> >> return status; >> >> } >> >> >> >> } >> >> >> >> All of this code came from looking at other sinks (Avro and HDFS), so I >> >> am >> >> pretty sure its correct. >> >> >> >> Can anyone see anything that might be a problem, or is there anything >> >> else >> >> I can do to avoid this error? >> >> >> >> Thanks, >> >> Andrew >> >> >> >> >> > >> >> >> >> -- >> Apache MRUnit - Unit testing MapReduce - >> http://incubator.apache.org/mrunit/ > > -- Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/
