Sure. Sink: http://pastebin.com/N6zh73hU Config: http://pastebin.com/Tc2MH9iV
Thanks. On 16 November 2012 15:32, Brock Noland <[email protected]> wrote: > Would you be able to send the source of your sink via pastbin in > addition to your config? > > On Fri, Nov 16, 2012 at 9:21 AM, Andrew Jones <[email protected]> > wrote: > > I tried logging the first throwable, but now that is just the > > IllegalStateException. > > > > Today I have been looking at Flume-1.3.0rc3 and I have noticed the same > > problem. This is using the Avro source, File channel and our custom sink. > > After Flume reloads its config, the first error message comes when the > Avro > > source starts up: > > > > 16 Nov 2012 16:04:25,237 INFO [lifecycleSupervisor-1-4] > > (org.apache.flume.source.AvroSource.start:142) - Starting Avro source > > source: { bindAddress: 0.0.0.0, port: 36060 }... > > 16 Nov 2012 16:04:25,484 ERROR > > [SinkRunner-PollingRunner-DefaultSinkProcessor] > > (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver > > event. Exception follows. > > java.lang.IllegalStateException: close() called when transaction is OPEN > - > > you must either commit or rollback first > > at > > com.google.common.base.Preconditions.checkState(Preconditions.java:176) > > at > > > org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179) > > at > > > com.arm.pd.brodie.flume.sink.ResultSink.processSingle(ResultSink.java:440) > > at > > com.arm.pd.brodie.flume.sink.ResultSink.process(ResultSink.java:172) > > at > > > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > > at > > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > > at java.lang.Thread.run(Thread.java:636) > > 16 Nov 2012 16:04:25,594 INFO [lifecycleSupervisor-1-4] > > (org.apache.flume.instrumentation.MonitoredCounterGroup.register:89) - > > Monitoried counter group for type: SOURCE, name: source, registered > > successfully. > > 16 Nov 2012 16:04:25,595 INFO [lifecycleSupervisor-1-4] > > (org.apache.flume.instrumentation.MonitoredCounterGroup.start:73) - > > Component type: SOURCE, name: source started > > 16 Nov 2012 16:04:25,595 INFO [lifecycleSupervisor-1-4] > > (org.apache.flume.source.AvroSource.start:168) - Avro source source > > started. > > > > I then continually get errors from the Sink, presumably as its been > called > > periodically to check for events in the channel. So is it possible its > the > > Avro source causing the issue? > > > > There should have been nothing persisted in the file channel when > > restarting. > > > > When the transaction gets messed up like this, is there a way to refresh > it, > > preferably without losing any data? > > > > I am still able to send things to flume and they get processed and > inserted > > by my sink, so it still seems to work OK. > > > > Thanks, > > Andrew > > > > > > > > On 15 November 2012 12:50, Brock Noland <[email protected]> wrote: > >> > >> Can you log the Throwable as the first thing in the catch block to see > >> if something and what it is, is being thrown? > >> > >> Transactions are thread local so if for some reason the the sequencing > >> gets messed up on an earlier call the process, every call to > >> transaction will thrown an exception including begin. > >> > >> > >> > https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java > >> > >> As I stated in FLUME-1089 I think that when close is called it should > >> forcefully destroy the transaction like JDBC close() but I have not > >> got much agreement. > >> > >> > >> On Thu, Nov 15, 2012 at 5:24 AM, Andrew Jones <[email protected]> > >> wrote: > >> > We are using Flume 1.2.0. We have a custom source, although it passes > >> > through an Avro Sink and Source before getting to the sink. We are now > >> > using > >> > the memory channel, although had just switched from the JDBC channel > >> > when we > >> > started seeing these errors, so maybe that's something to do with it? > >> > > >> > I tried wrapping transaction.rollback(); in a try catch and logging in > >> > the > >> > catch, but it wasn't called, so I don't think the rollback is throwing > >> > an > >> > error. > >> > > >> > I think it may have something to do with switching channels, as right > >> > after > >> > Flume reloaded the config we started getting errors. I have restarted > >> > the > >> > flume node manually and we are still getting the error. > >> > > >> > Thanks, > >> > Andrew > >> > > >> > > >> > On 14 November 2012 20:02, Hari Shreedharan < > [email protected]> > >> > wrote: > >> >> > >> >> Which version of Flume are you using? It looks like the transaction > was > >> >> never rolled back or committed. It is likely that the rollback method > >> >> too > >> >> threw some exception, and the rollback was not successful. Also, what > >> >> channel are you using? > >> >> > >> >> > >> >> Thanks, > >> >> Hari > >> >> > >> >> -- > >> >> Hari Shreedharan > >> >> > >> >> On Wednesday, November 14, 2012 at 8:55 AM, Andrew Jones wrote: > >> >> > >> >> Hi, > >> >> > >> >> I have a custom sink which has been working fine, but recently I have > >> >> started seeing this error in the logs: > >> >> > >> >> Unable to deliver event. Exception follows. > >> >> java.lang.IllegalStateException: close() called when transaction is > >> >> OPEN - > >> >> you must either commit or rollback first > >> >> at > >> >> > com.google.common.base.Preconditions.checkState(Preconditions.java:176) > >> >> ... > >> >> > >> >> > >> >> After having a google and finding > >> >> https://issues.apache.org/jira/browse/FLUME-1089, I have double > checked > >> >> I am > >> >> using the correct try, catch, finally idiom that other sinks use, > and I > >> >> seem > >> >> to be doing the same. I do the following: > >> >> > >> >> public Status process() throws EventDeliveryException { > >> >> Status status = Status.READY; > >> >> > >> >> Channel channel = getChannel(); > >> >> Transaction transaction = channel.getTransaction(); > >> >> > >> >> try { > >> >> transaction.begin(); > >> >> > >> >> // does a bit of processing and > >> >> // writes out the event to MongoDB > >> >> > >> >> transaction.commit(); > >> >> > >> >> } catch (Throwable t) { > >> >> transaction.rollback(); > >> >> > >> >> if (t instanceof Error) { > >> >> throw (Error) t; > >> >> } else if (t instanceof EventDeliveryException) { > >> >> throw (EventDeliveryException) t; > >> >> } else if (t instanceof ChannelException) { > >> >> logger.error("Brodie Log Sink " + getName() + ": Unable to get event > >> >> from" > >> >> + > >> >> " channel " + channel.getName() + ". Exception follows.", t); > >> >> status = Status.BACKOFF; > >> >> } else { > >> >> throw new EventDeliveryException("Failed to send events", t); > >> >> } > >> >> } finally { > >> >> transaction.close(); > >> >> } > >> >> > >> >> return status; > >> >> } > >> >> > >> >> } > >> >> > >> >> All of this code came from looking at other sinks (Avro and HDFS), > so I > >> >> am > >> >> pretty sure its correct. > >> >> > >> >> Can anyone see anything that might be a problem, or is there anything > >> >> else > >> >> I can do to avoid this error? > >> >> > >> >> Thanks, > >> >> Andrew > >> >> > >> >> > >> > > >> > >> > >> > >> -- > >> Apache MRUnit - Unit testing MapReduce - > >> http://incubator.apache.org/mrunit/ > > > > > > > > -- > Apache MRUnit - Unit testing MapReduce - > http://incubator.apache.org/mrunit/ >
