If channel.take() returns null, no commit or rollback is called.... Checkout how logger sink handles this:
https://git-wip-us.apache.org/repos/asf?p=flume.git;a=blob;f=flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java;h=128fa8427af633c0c7c50093f8f6c4ef9bb8ae76;hb=HEAD brock On Fri, Nov 16, 2012 at 9:45 AM, Andrew Jones <[email protected]> wrote: > Sure. > > Sink: http://pastebin.com/N6zh73hU > Config: http://pastebin.com/Tc2MH9iV > > Thanks. > > > On 16 November 2012 15:32, Brock Noland <[email protected]> wrote: >> >> Would you be able to send the source of your sink via pastbin in >> addition to your config? >> >> On Fri, Nov 16, 2012 at 9:21 AM, Andrew Jones <[email protected]> >> wrote: >> > I tried logging the first throwable, but now that is just the >> > IllegalStateException. >> > >> > Today I have been looking at Flume-1.3.0rc3 and I have noticed the same >> > problem. This is using the Avro source, File channel and our custom >> > sink. >> > After Flume reloads its config, the first error message comes when the >> > Avro >> > source starts up: >> > >> > 16 Nov 2012 16:04:25,237 INFO [lifecycleSupervisor-1-4] >> > (org.apache.flume.source.AvroSource.start:142) - Starting Avro source >> > source: { bindAddress: 0.0.0.0, port: 36060 }... >> > 16 Nov 2012 16:04:25,484 ERROR >> > [SinkRunner-PollingRunner-DefaultSinkProcessor] >> > (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver >> > event. Exception follows. >> > java.lang.IllegalStateException: close() called when transaction is OPEN >> > - >> > you must either commit or rollback first >> > at >> > com.google.common.base.Preconditions.checkState(Preconditions.java:176) >> > at >> > >> > org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179) >> > at >> > >> > com.arm.pd.brodie.flume.sink.ResultSink.processSingle(ResultSink.java:440) >> > at >> > com.arm.pd.brodie.flume.sink.ResultSink.process(ResultSink.java:172) >> > at >> > >> > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) >> > at >> > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) >> > at java.lang.Thread.run(Thread.java:636) >> > 16 Nov 2012 16:04:25,594 INFO [lifecycleSupervisor-1-4] >> > (org.apache.flume.instrumentation.MonitoredCounterGroup.register:89) - >> > Monitoried counter group for type: SOURCE, name: source, registered >> > successfully. >> > 16 Nov 2012 16:04:25,595 INFO [lifecycleSupervisor-1-4] >> > (org.apache.flume.instrumentation.MonitoredCounterGroup.start:73) - >> > Component type: SOURCE, name: source started >> > 16 Nov 2012 16:04:25,595 INFO [lifecycleSupervisor-1-4] >> > (org.apache.flume.source.AvroSource.start:168) - Avro source source >> > started. >> > >> > I then continually get errors from the Sink, presumably as its been >> > called >> > periodically to check for events in the channel. So is it possible its >> > the >> > Avro source causing the issue? >> > >> > There should have been nothing persisted in the file channel when >> > restarting. >> > >> > When the transaction gets messed up like this, is there a way to refresh >> > it, >> > preferably without losing any data? >> > >> > I am still able to send things to flume and they get processed and >> > inserted >> > by my sink, so it still seems to work OK. >> > >> > Thanks, >> > Andrew >> > >> > >> > >> > On 15 November 2012 12:50, Brock Noland <[email protected]> wrote: >> >> >> >> Can you log the Throwable as the first thing in the catch block to see >> >> if something and what it is, is being thrown? >> >> >> >> Transactions are thread local so if for some reason the the sequencing >> >> gets messed up on an earlier call the process, every call to >> >> transaction will thrown an exception including begin. >> >> >> >> >> >> >> >> https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java >> >> >> >> As I stated in FLUME-1089 I think that when close is called it should >> >> forcefully destroy the transaction like JDBC close() but I have not >> >> got much agreement. >> >> >> >> >> >> On Thu, Nov 15, 2012 at 5:24 AM, Andrew Jones <[email protected]> >> >> wrote: >> >> > We are using Flume 1.2.0. We have a custom source, although it passes >> >> > through an Avro Sink and Source before getting to the sink. We are >> >> > now >> >> > using >> >> > the memory channel, although had just switched from the JDBC channel >> >> > when we >> >> > started seeing these errors, so maybe that's something to do with it? >> >> > >> >> > I tried wrapping transaction.rollback(); in a try catch and logging >> >> > in >> >> > the >> >> > catch, but it wasn't called, so I don't think the rollback is >> >> > throwing >> >> > an >> >> > error. >> >> > >> >> > I think it may have something to do with switching channels, as right >> >> > after >> >> > Flume reloaded the config we started getting errors. I have restarted >> >> > the >> >> > flume node manually and we are still getting the error. >> >> > >> >> > Thanks, >> >> > Andrew >> >> > >> >> > >> >> > On 14 November 2012 20:02, Hari Shreedharan >> >> > <[email protected]> >> >> > wrote: >> >> >> >> >> >> Which version of Flume are you using? It looks like the transaction >> >> >> was >> >> >> never rolled back or committed. It is likely that the rollback >> >> >> method >> >> >> too >> >> >> threw some exception, and the rollback was not successful. Also, >> >> >> what >> >> >> channel are you using? >> >> >> >> >> >> >> >> >> Thanks, >> >> >> Hari >> >> >> >> >> >> -- >> >> >> Hari Shreedharan >> >> >> >> >> >> On Wednesday, November 14, 2012 at 8:55 AM, Andrew Jones wrote: >> >> >> >> >> >> Hi, >> >> >> >> >> >> I have a custom sink which has been working fine, but recently I >> >> >> have >> >> >> started seeing this error in the logs: >> >> >> >> >> >> Unable to deliver event. Exception follows. >> >> >> java.lang.IllegalStateException: close() called when transaction is >> >> >> OPEN - >> >> >> you must either commit or rollback first >> >> >> at >> >> >> >> >> >> com.google.common.base.Preconditions.checkState(Preconditions.java:176) >> >> >> ... >> >> >> >> >> >> >> >> >> After having a google and finding >> >> >> https://issues.apache.org/jira/browse/FLUME-1089, I have double >> >> >> checked >> >> >> I am >> >> >> using the correct try, catch, finally idiom that other sinks use, >> >> >> and I >> >> >> seem >> >> >> to be doing the same. I do the following: >> >> >> >> >> >> public Status process() throws EventDeliveryException { >> >> >> Status status = Status.READY; >> >> >> >> >> >> Channel channel = getChannel(); >> >> >> Transaction transaction = channel.getTransaction(); >> >> >> >> >> >> try { >> >> >> transaction.begin(); >> >> >> >> >> >> // does a bit of processing and >> >> >> // writes out the event to MongoDB >> >> >> >> >> >> transaction.commit(); >> >> >> >> >> >> } catch (Throwable t) { >> >> >> transaction.rollback(); >> >> >> >> >> >> if (t instanceof Error) { >> >> >> throw (Error) t; >> >> >> } else if (t instanceof EventDeliveryException) { >> >> >> throw (EventDeliveryException) t; >> >> >> } else if (t instanceof ChannelException) { >> >> >> logger.error("Brodie Log Sink " + getName() + ": Unable to get event >> >> >> from" >> >> >> + >> >> >> " channel " + channel.getName() + ". Exception follows.", t); >> >> >> status = Status.BACKOFF; >> >> >> } else { >> >> >> throw new EventDeliveryException("Failed to send events", t); >> >> >> } >> >> >> } finally { >> >> >> transaction.close(); >> >> >> } >> >> >> >> >> >> return status; >> >> >> } >> >> >> >> >> >> } >> >> >> >> >> >> All of this code came from looking at other sinks (Avro and HDFS), >> >> >> so I >> >> >> am >> >> >> pretty sure its correct. >> >> >> >> >> >> Can anyone see anything that might be a problem, or is there >> >> >> anything >> >> >> else >> >> >> I can do to avoid this error? >> >> >> >> >> >> Thanks, >> >> >> Andrew >> >> >> >> >> >> >> >> > >> >> >> >> >> >> >> >> -- >> >> Apache MRUnit - Unit testing MapReduce - >> >> http://incubator.apache.org/mrunit/ >> > >> > >> >> >> >> -- >> Apache MRUnit - Unit testing MapReduce - >> http://incubator.apache.org/mrunit/ > > -- Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/
