Mike, you have a memory leak in your implementation. Even message that fails completely is never cleaned up.
On Fri, Oct 2, 2009 at 5:43 PM, Mike Nichols <[email protected]>wrote: > Jason > Here is my module that works as you are trying to (I believe). Note I am > doing the send in the MessageCompleted handler,not in the messageFailure. > Ignore the lame method of configuration for the NumberOfRetries value...:) > public class ExceptionHandlingMessageModule : IMessageModule, > IBusConfigurationAware > { > public static int? NumberOfRetries; > private readonly Hashtable<string, ErrorCounter> failureCounts = > new Hashtable<string, ErrorCounter>(); > private ITransport transport; > > public void Configure(AbstractRhinoServiceBusFacility facility, > IConfiguration configuration) > { > if (!NumberOfRetries.HasValue) > NumberOfRetries = facility.NumberOfRetries; > } > > public void Init(ITransport transport) > { > this.transport = transport; > transport.MessageProcessingFailure += > TransportOnMessageProcessingFailure; > transport.MessageProcessingCompleted += > TransportOnMessageProcessingCompleted; > } > > public void Stop(ITransport transport) > { > transport.MessageProcessingFailure -= > TransportOnMessageProcessingFailure; > transport.MessageProcessingCompleted -= > TransportOnMessageProcessingCompleted; > } > > private void > TransportOnMessageProcessingCompleted(CurrentMessageInformation information, > Exception e) > { > ErrorCounter val = null; > failureCounts.Read(reader => > reader.TryGetValue(information.TransportMessageId, out val)); > if (val == null) > return; > if (val.FailureCount == NumberOfRetries) > { > var messageType = (information.Message ?? "no > message").ToString(); > // Log.For(this).Info("Sending error message for exception > '{0}'",e); > transport.Send(new Endpoint {Uri = information.Source}, > new[] > > { > > new ExceptionMessage > > { > > ErrorText = val.Exception.Message, > > ExceptionToString = val.Exception.ToString(), > > Timestamp = DateTime.Now, > > MessageType = messageType, > > MessageId = information.MessageId, > > Source = information.Source, > > Message = information.Message > > } > > }); > } > if (e == null) > { > failureCounts.Write(writer => > writer.Remove(information.TransportMessageId)); > } > } > > > private void > TransportOnMessageProcessingFailure(CurrentMessageInformation information, > Exception exception) > { > failureCounts.Write(writer => > { > ErrorCounter errorCounter; > if > (writer.TryGetValue(information.TransportMessageId, out errorCounter) == > false) > { > errorCounter = new ErrorCounter > { > > FailureCount = 0, > > Exception = exception > }; > > writer.Add(information.TransportMessageId, errorCounter); > } > errorCounter.FailureCount += 1; > }); > } > > private class ErrorCounter > { > public Exception Exception; > public int FailureCount; > > } > } > > > > > > On Fri, Oct 2, 2009 at 7:20 AM, Jason Meckley <[email protected]>wrote: > >> >> I use RQ for all my projects, but I'm not seeing any exceptions. in >> the code i provided originally I'm replying on the transport within >> the MessageProcessingFailure event after the maximum number of >> failures occurs. should I move this to MessageProcessingCompleted, >> checking the number of failed attempts to determine if a reply should >> be sent? >> >> On Oct 2, 10:09 am, Ayende Rahien <[email protected]> wrote: >> > And if this after the rollback, it also explains it. >> > Depending on what kind of queue you use. If you are using MSMQ, it will >> be >> > silently ignored. If you are using RQ, it will throw. >> > >> > On Fri, Oct 2, 2009 at 4:04 PM, Jason Meckley <[email protected] >> >wrote: >> > >> > >> > >> > > I originally got the idea from the LoadBalancerMessageModule. I will >> > > log some stats on Transaction.Current to see what's happening there. >> > >> > > if this is before rollback, then that would explain why the message is >> > > not sent, correct? >> > >> > > On Oct 2, 9:17 am, Ayende Rahien <[email protected]> wrote: >> > > > Jason, >> > > > I don't think that you can send messages from message modules. >> > > > The idea never occurred to me, to tell you the truth. >> > > > In this particular case, I can't recall off hand what the >> transaction >> > > state >> > > > if for this case, but I think it is just before we roll it back, or >> maybe >> > > > even without a transaction. >> > > > Both of those scenarios will explain why you aren't seeing the >> message >> > > being >> > > > sent. >> > >> > > > On Thu, Oct 1, 2009 at 8:37 PM, Jason Meckley < >> [email protected] >> > > >wrote: >> > >> > > > > I have ~2 dozen consumers that all handle the happy days scenario >> (no >> > > > > exceptions). I then have a message module which will handle the >> > > > > exceptions. the code my be easier to understand >> > >> > > > > public class OutOfRetriesMessageModule : IMessageModule >> > > > > { >> > > > > private readonly IReflection reflection; >> > > > > private readonly int numberOfRetries; >> > > > > private readonly Hashtable<Guid, int> failures; >> > >> > > > > private ITransport theTransport; >> > >> > > > > public OutOfRetriesMessageModule(int numberOfRetries, >> > > > > Hashtable<Guid, int> failures, IReflection reflection) >> > > > > { >> > > > > this.numberOfRetries = numberOfRetries; >> > > > > this.failures = failures; >> > > > > this.reflection = reflection; >> > > > > } >> > >> > > > > public void Init(ITransport transport) >> > > > > { >> > > > > theTransport = transport; >> > > > > transport.MessageProcessingFailure += >> > > > > message_processing_failure_send_failure_notice_if_necessary; >> > > > > transport.MessageProcessingCompleted += >> > > > > message_processing_completed_clear_failures_if_necessary; >> > > > > } >> > >> > > > > public void Stop(ITransport transport) >> > > > > { >> > > > > transport.MessageProcessingFailure -= >> > > > > message_processing_failure_send_failure_notice_if_necessary; >> > > > > transport.MessageProcessingCompleted -= >> > > > > message_processing_completed_clear_failures_if_necessary; >> > > > > } >> > >> > > > > private void >> > > > > message_processing_failure_send_failure_notice_if_necessary >> > > > > (CurrentMessageInformation information, Exception exception) >> > > > > { >> > > > > var messageId = information.MessageId; >> > > > > if (exception == null) return; >> > > > > if(!typeof >> > > > > (RequestToGenerateReportMessage).IsAssignableFrom >> > > > > (information.Message.GetType())) return; >> > >> > > > > increment_failed_attempts(messageId); >> > > > > if (!has_exceeded_number_of_retries(messageId)) return; >> > >> > > > > send_failure_notice(information); >> > > > > } >> > >> > > > > private void >> > > > > message_processing_completed_clear_failures_if_necessary >> > > > > (CurrentMessageInformation information, Exception exception) >> > > > > { >> > > > > if (exception != null) return; >> > > > > failures.Write(writer => writer.Remove >> > > > > (information.MessageId)); >> > > > > } >> > >> > > > > private void increment_failed_attempts(Guid messageId) >> > > > > { >> > > > > failures.Write(writer => >> > > > > { >> > > > > int attempts; >> > > > > writer.TryGetValue(messageId, >> out >> > > > > attempts); >> > > > > writer.Add(messageId, >> ++attempts); >> > > > > }); >> > > > > } >> > >> > > > > private bool has_exceeded_number_of_retries(Guid id) >> > > > > { >> > > > > var attempts = 0; >> > > > > failures.Read(reader => reader.TryGetValue(id, out >> > > > > attempts)); >> > > > > return attempts >= numberOfRetries; >> > > > > } >> > >> > > > > private void send_failure_notice(CurrentMessageInformation >> > > > > information) >> > > > > { >> > > > > var message = information.Message; >> > > > > var generatedReportFailed = new ReportGeneratedMessage >> > > > > { >> > > > > instance_id = >> > > > > reflection.parse_instance_id(message), >> > > > > date = reflection.parse_date >> > > > > (message), >> > > > > region_id = >> > > > > reflection.parse_region_id(message), >> > > > > report_id = >> > > > > reflection.parse_report_id(message), >> > > > > file_name = "", >> > > > > success = false >> > > > > }; >> > > > > theTransport.Reply(generatedReportFailed); >> > > > > } >> > > > > } >> > >> > > > > the problem is the system is not throwing an excpetion and I'm not >> > > > > consuming the generatedReportFailed message on the other end. if I >> > > > > move the send_failure_notice logic into the Consume() member of >> each >> > > > > consumer wrapped in a try/catch the workflow is processed >> correctly >> > > > > and I receive a failure notice on the other end. >> > >> > > > > it what i'm trying to do possible? send/reply a message from >> within >> > > > > the message module? it would seem I don't need to wrap this in a >> > > > > transactionscope because theTransport will handle that for me. the >> > > > > Transport implementation is RQ. >> > >> > > > > I also tried >> > > > > theTransport.Send(new EndPoint{Url=information.Source}, new >> object[] >> > > > > {generatedReportFailed}); >> > > > > but that was throwing timeout exceptions. (duration cannot be >> > > > > negative). or just not working at all. >> >> > > > > --~--~---------~--~----~------------~-------~--~----~ You received this message because you are subscribed to the Google Groups "Rhino Tools Dev" group. To post to this group, send email to [email protected] To unsubscribe from this group, send email to [email protected] For more options, visit this group at http://groups.google.com/group/rhino-tools-dev?hl=en -~----------~----~----~----~------~----~------~--~---
