Thanks I'll look at that On Fri, Oct 2, 2009 at 9:44 AM, Ayende Rahien <[email protected]> wrote:
> Mike, you have a memory leak in your implementation. > Even message that fails completely is never cleaned up. > > > On Fri, Oct 2, 2009 at 5:43 PM, Mike Nichols <[email protected]>wrote: > >> Jason >> Here is my module that works as you are trying to (I believe). Note I am >> doing the send in the MessageCompleted handler,not in the messageFailure. >> Ignore the lame method of configuration for the NumberOfRetries value...:) >> public class ExceptionHandlingMessageModule : IMessageModule, >> IBusConfigurationAware >> { >> public static int? NumberOfRetries; >> private readonly Hashtable<string, ErrorCounter> failureCounts = >> new Hashtable<string, ErrorCounter>(); >> private ITransport transport; >> >> public void Configure(AbstractRhinoServiceBusFacility facility, >> IConfiguration configuration) >> { >> if (!NumberOfRetries.HasValue) >> NumberOfRetries = facility.NumberOfRetries; >> } >> >> public void Init(ITransport transport) >> { >> this.transport = transport; >> transport.MessageProcessingFailure += >> TransportOnMessageProcessingFailure; >> transport.MessageProcessingCompleted += >> TransportOnMessageProcessingCompleted; >> } >> >> public void Stop(ITransport transport) >> { >> transport.MessageProcessingFailure -= >> TransportOnMessageProcessingFailure; >> transport.MessageProcessingCompleted -= >> TransportOnMessageProcessingCompleted; >> } >> >> private void >> TransportOnMessageProcessingCompleted(CurrentMessageInformation information, >> Exception e) >> { >> ErrorCounter val = null; >> failureCounts.Read(reader => >> reader.TryGetValue(information.TransportMessageId, out val)); >> if (val == null) >> return; >> if (val.FailureCount == NumberOfRetries) >> { >> var messageType = (information.Message ?? "no >> message").ToString(); >> // Log.For(this).Info("Sending error message for exception >> '{0}'",e); >> transport.Send(new Endpoint {Uri = information.Source}, >> new[] >> >> { >> >> new ExceptionMessage >> >> { >> >> ErrorText = val.Exception.Message, >> >> ExceptionToString = val.Exception.ToString(), >> >> Timestamp = DateTime.Now, >> >> MessageType = messageType, >> >> MessageId = information.MessageId, >> >> Source = information.Source, >> >> Message = information.Message >> >> } >> >> }); >> } >> if (e == null) >> { >> failureCounts.Write(writer => >> writer.Remove(information.TransportMessageId)); >> } >> } >> >> >> private void >> TransportOnMessageProcessingFailure(CurrentMessageInformation information, >> Exception exception) >> { >> failureCounts.Write(writer => >> { >> ErrorCounter errorCounter; >> if >> (writer.TryGetValue(information.TransportMessageId, out errorCounter) == >> false) >> { >> errorCounter = new >> ErrorCounter >> { >> >> FailureCount = 0, >> >> Exception = exception >> }; >> >> writer.Add(information.TransportMessageId, errorCounter); >> } >> errorCounter.FailureCount += 1; >> }); >> } >> >> private class ErrorCounter >> { >> public Exception Exception; >> public int FailureCount; >> >> } >> } >> >> >> >> >> >> On Fri, Oct 2, 2009 at 7:20 AM, Jason Meckley <[email protected]>wrote: >> >>> >>> I use RQ for all my projects, but I'm not seeing any exceptions. in >>> the code i provided originally I'm replying on the transport within >>> the MessageProcessingFailure event after the maximum number of >>> failures occurs. should I move this to MessageProcessingCompleted, >>> checking the number of failed attempts to determine if a reply should >>> be sent? >>> >>> On Oct 2, 10:09 am, Ayende Rahien <[email protected]> wrote: >>> > And if this after the rollback, it also explains it. >>> > Depending on what kind of queue you use. If you are using MSMQ, it will >>> be >>> > silently ignored. If you are using RQ, it will throw. >>> > >>> > On Fri, Oct 2, 2009 at 4:04 PM, Jason Meckley <[email protected] >>> >wrote: >>> > >>> > >>> > >>> > > I originally got the idea from the LoadBalancerMessageModule. I will >>> > > log some stats on Transaction.Current to see what's happening there. >>> > >>> > > if this is before rollback, then that would explain why the message >>> is >>> > > not sent, correct? >>> > >>> > > On Oct 2, 9:17 am, Ayende Rahien <[email protected]> wrote: >>> > > > Jason, >>> > > > I don't think that you can send messages from message modules. >>> > > > The idea never occurred to me, to tell you the truth. >>> > > > In this particular case, I can't recall off hand what the >>> transaction >>> > > state >>> > > > if for this case, but I think it is just before we roll it back, or >>> maybe >>> > > > even without a transaction. >>> > > > Both of those scenarios will explain why you aren't seeing the >>> message >>> > > being >>> > > > sent. >>> > >>> > > > On Thu, Oct 1, 2009 at 8:37 PM, Jason Meckley < >>> [email protected] >>> > > >wrote: >>> > >>> > > > > I have ~2 dozen consumers that all handle the happy days scenario >>> (no >>> > > > > exceptions). I then have a message module which will handle the >>> > > > > exceptions. the code my be easier to understand >>> > >>> > > > > public class OutOfRetriesMessageModule : IMessageModule >>> > > > > { >>> > > > > private readonly IReflection reflection; >>> > > > > private readonly int numberOfRetries; >>> > > > > private readonly Hashtable<Guid, int> failures; >>> > >>> > > > > private ITransport theTransport; >>> > >>> > > > > public OutOfRetriesMessageModule(int numberOfRetries, >>> > > > > Hashtable<Guid, int> failures, IReflection reflection) >>> > > > > { >>> > > > > this.numberOfRetries = numberOfRetries; >>> > > > > this.failures = failures; >>> > > > > this.reflection = reflection; >>> > > > > } >>> > >>> > > > > public void Init(ITransport transport) >>> > > > > { >>> > > > > theTransport = transport; >>> > > > > transport.MessageProcessingFailure += >>> > > > > message_processing_failure_send_failure_notice_if_necessary; >>> > > > > transport.MessageProcessingCompleted += >>> > > > > message_processing_completed_clear_failures_if_necessary; >>> > > > > } >>> > >>> > > > > public void Stop(ITransport transport) >>> > > > > { >>> > > > > transport.MessageProcessingFailure -= >>> > > > > message_processing_failure_send_failure_notice_if_necessary; >>> > > > > transport.MessageProcessingCompleted -= >>> > > > > message_processing_completed_clear_failures_if_necessary; >>> > > > > } >>> > >>> > > > > private void >>> > > > > message_processing_failure_send_failure_notice_if_necessary >>> > > > > (CurrentMessageInformation information, Exception exception) >>> > > > > { >>> > > > > var messageId = information.MessageId; >>> > > > > if (exception == null) return; >>> > > > > if(!typeof >>> > > > > (RequestToGenerateReportMessage).IsAssignableFrom >>> > > > > (information.Message.GetType())) return; >>> > >>> > > > > increment_failed_attempts(messageId); >>> > > > > if (!has_exceeded_number_of_retries(messageId)) >>> return; >>> > >>> > > > > send_failure_notice(information); >>> > > > > } >>> > >>> > > > > private void >>> > > > > message_processing_completed_clear_failures_if_necessary >>> > > > > (CurrentMessageInformation information, Exception exception) >>> > > > > { >>> > > > > if (exception != null) return; >>> > > > > failures.Write(writer => writer.Remove >>> > > > > (information.MessageId)); >>> > > > > } >>> > >>> > > > > private void increment_failed_attempts(Guid messageId) >>> > > > > { >>> > > > > failures.Write(writer => >>> > > > > { >>> > > > > int attempts; >>> > > > > writer.TryGetValue(messageId, >>> out >>> > > > > attempts); >>> > > > > writer.Add(messageId, >>> ++attempts); >>> > > > > }); >>> > > > > } >>> > >>> > > > > private bool has_exceeded_number_of_retries(Guid id) >>> > > > > { >>> > > > > var attempts = 0; >>> > > > > failures.Read(reader => reader.TryGetValue(id, out >>> > > > > attempts)); >>> > > > > return attempts >= numberOfRetries; >>> > > > > } >>> > >>> > > > > private void send_failure_notice(CurrentMessageInformation >>> > > > > information) >>> > > > > { >>> > > > > var message = information.Message; >>> > > > > var generatedReportFailed = new ReportGeneratedMessage >>> > > > > { >>> > > > > instance_id = >>> > > > > reflection.parse_instance_id(message), >>> > > > > date = reflection.parse_date >>> > > > > (message), >>> > > > > region_id = >>> > > > > reflection.parse_region_id(message), >>> > > > > report_id = >>> > > > > reflection.parse_report_id(message), >>> > > > > file_name = "", >>> > > > > success = false >>> > > > > }; >>> > > > > theTransport.Reply(generatedReportFailed); >>> > > > > } >>> > > > > } >>> > >>> > > > > the problem is the system is not throwing an excpetion and I'm >>> not >>> > > > > consuming the generatedReportFailed message on the other end. if >>> I >>> > > > > move the send_failure_notice logic into the Consume() member of >>> each >>> > > > > consumer wrapped in a try/catch the workflow is processed >>> correctly >>> > > > > and I receive a failure notice on the other end. >>> > >>> > > > > it what i'm trying to do possible? send/reply a message from >>> within >>> > > > > the message module? it would seem I don't need to wrap this in a >>> > > > > transactionscope because theTransport will handle that for me. >>> the >>> > > > > Transport implementation is RQ. >>> > >>> > > > > I also tried >>> > > > > theTransport.Send(new EndPoint{Url=information.Source}, new >>> object[] >>> > > > > {generatedReportFailed}); >>> > > > > but that was throwing timeout exceptions. (duration cannot be >>> > > > > negative). or just not working at all. >>> >>> >> >> >> > > > > --~--~---------~--~----~------------~-------~--~----~ You received this message because you are subscribed to the Google Groups "Rhino Tools Dev" group. To post to this group, send email to [email protected] To unsubscribe from this group, send email to [email protected] For more options, visit this group at http://groups.google.com/group/rhino-tools-dev?hl=en -~----------~----~----~----~------~----~------~--~---
