Thanks I'll look at that

On Fri, Oct 2, 2009 at 9:44 AM, Ayende Rahien <[email protected]> wrote:

> Mike, you have a memory leak in your implementation.
> Even message that fails completely is never cleaned up.
>
>
> On Fri, Oct 2, 2009 at 5:43 PM, Mike Nichols <[email protected]>wrote:
>
>> Jason
>> Here is my module that works as you are trying to (I believe). Note I am
>> doing the send in the MessageCompleted handler,not in the messageFailure.
>> Ignore the lame method of configuration for the NumberOfRetries value...:)
>> public class ExceptionHandlingMessageModule : IMessageModule,
>> IBusConfigurationAware
>>     {
>>         public static int? NumberOfRetries;
>>         private readonly Hashtable<string, ErrorCounter> failureCounts =
>> new Hashtable<string, ErrorCounter>();
>>         private ITransport transport;
>>
>>         public void Configure(AbstractRhinoServiceBusFacility facility,
>> IConfiguration configuration)
>>         {
>>             if (!NumberOfRetries.HasValue)
>>                 NumberOfRetries = facility.NumberOfRetries;
>>         }
>>
>>         public void Init(ITransport transport)
>>         {
>>             this.transport = transport;
>>             transport.MessageProcessingFailure +=
>> TransportOnMessageProcessingFailure;
>>             transport.MessageProcessingCompleted +=
>> TransportOnMessageProcessingCompleted;
>>         }
>>
>>         public void Stop(ITransport transport)
>>         {
>>             transport.MessageProcessingFailure -=
>> TransportOnMessageProcessingFailure;
>>             transport.MessageProcessingCompleted -=
>> TransportOnMessageProcessingCompleted;
>>         }
>>
>>         private void
>> TransportOnMessageProcessingCompleted(CurrentMessageInformation information,
>> Exception e)
>>         {
>>             ErrorCounter val = null;
>>             failureCounts.Read(reader =>
>> reader.TryGetValue(information.TransportMessageId, out val));
>>             if (val == null)
>>                 return;
>>             if (val.FailureCount == NumberOfRetries)
>>             {
>>                 var messageType = (information.Message ?? "no
>> message").ToString();
>> //                Log.For(this).Info("Sending error message for exception
>> '{0}'",e);
>>                 transport.Send(new Endpoint {Uri = information.Source},
>> new[]
>>
>>     {
>>
>>         new ExceptionMessage
>>
>>             {
>>
>>                 ErrorText = val.Exception.Message,
>>
>>                 ExceptionToString = val.Exception.ToString(),
>>
>>                 Timestamp = DateTime.Now,
>>
>>                 MessageType = messageType,
>>
>>                 MessageId = information.MessageId,
>>
>>                 Source = information.Source,
>>
>>                 Message = information.Message
>>
>>             }
>>
>>     });
>>             }
>>             if (e == null)
>>             {
>>                 failureCounts.Write(writer =>
>> writer.Remove(information.TransportMessageId));
>>             }
>>         }
>>
>>
>>         private void
>> TransportOnMessageProcessingFailure(CurrentMessageInformation information,
>> Exception exception)
>>         {
>>             failureCounts.Write(writer =>
>>                                     {
>>                                         ErrorCounter errorCounter;
>>                                         if
>> (writer.TryGetValue(information.TransportMessageId, out errorCounter) ==
>> false)
>>                                         {
>>                                             errorCounter = new
>> ErrorCounter
>>                                                                {
>>
>> FailureCount = 0,
>>
>> Exception = exception
>>                                                                };
>>
>> writer.Add(information.TransportMessageId, errorCounter);
>>                                         }
>>                                         errorCounter.FailureCount += 1;
>>                                     });
>>         }
>>
>>         private class ErrorCounter
>>         {
>>             public Exception Exception;
>>             public int FailureCount;
>>
>>         }
>>     }
>>
>>
>>
>>
>>
>> On Fri, Oct 2, 2009 at 7:20 AM, Jason Meckley <[email protected]>wrote:
>>
>>>
>>> I use RQ for all my projects, but I'm not seeing any exceptions. in
>>> the code i provided originally I'm replying on the transport within
>>> the MessageProcessingFailure event after the maximum number of
>>> failures occurs. should I move this to MessageProcessingCompleted,
>>> checking the number of failed attempts to determine if a reply should
>>> be sent?
>>>
>>> On Oct 2, 10:09 am, Ayende Rahien <[email protected]> wrote:
>>> > And if this after the rollback, it also explains it.
>>> > Depending on what kind of queue you use. If you are using MSMQ, it will
>>> be
>>> > silently ignored. If you are using RQ, it will throw.
>>> >
>>> > On Fri, Oct 2, 2009 at 4:04 PM, Jason Meckley <[email protected]
>>> >wrote:
>>> >
>>> >
>>> >
>>> > > I originally got the idea from the LoadBalancerMessageModule. I will
>>> > > log some stats on Transaction.Current to see what's happening there.
>>> >
>>> > > if this is before rollback, then that would explain why the message
>>> is
>>> > > not sent, correct?
>>> >
>>> > > On Oct 2, 9:17 am, Ayende Rahien <[email protected]> wrote:
>>> > > > Jason,
>>> > > > I don't think that you can send messages from message modules.
>>> > > > The idea never occurred to me, to tell you the truth.
>>> > > > In this particular case, I can't recall off hand what the
>>> transaction
>>> > > state
>>> > > > if for this case, but I think it is just before we roll it back, or
>>> maybe
>>> > > > even without a transaction.
>>> > > > Both of those scenarios will explain why you aren't seeing the
>>> message
>>> > > being
>>> > > > sent.
>>> >
>>> > > > On Thu, Oct 1, 2009 at 8:37 PM, Jason Meckley <
>>> [email protected]
>>> > > >wrote:
>>> >
>>> > > > > I have ~2 dozen consumers that all handle the happy days scenario
>>> (no
>>> > > > > exceptions). I then have a message module which will handle the
>>> > > > > exceptions. the code my be easier to understand
>>> >
>>> > > > >    public class OutOfRetriesMessageModule : IMessageModule
>>> > > > >    {
>>> > > > >        private readonly IReflection reflection;
>>> > > > >        private readonly int numberOfRetries;
>>> > > > >        private readonly Hashtable<Guid, int> failures;
>>> >
>>> > > > >        private ITransport theTransport;
>>> >
>>> > > > >        public OutOfRetriesMessageModule(int numberOfRetries,
>>> > > > > Hashtable<Guid, int> failures, IReflection reflection)
>>> > > > >        {
>>> > > > >            this.numberOfRetries = numberOfRetries;
>>> > > > >            this.failures = failures;
>>> > > > >            this.reflection = reflection;
>>> > > > >        }
>>> >
>>> > > > >        public void Init(ITransport transport)
>>> > > > >        {
>>> > > > >            theTransport = transport;
>>> > > > >            transport.MessageProcessingFailure +=
>>> > > > > message_processing_failure_send_failure_notice_if_necessary;
>>> > > > >            transport.MessageProcessingCompleted +=
>>> > > > > message_processing_completed_clear_failures_if_necessary;
>>> > > > >        }
>>> >
>>> > > > >        public void Stop(ITransport transport)
>>> > > > >        {
>>> > > > >            transport.MessageProcessingFailure -=
>>> > > > > message_processing_failure_send_failure_notice_if_necessary;
>>> > > > >            transport.MessageProcessingCompleted -=
>>> > > > > message_processing_completed_clear_failures_if_necessary;
>>> > > > >        }
>>> >
>>> > > > >        private void
>>> > > > > message_processing_failure_send_failure_notice_if_necessary
>>> > > > > (CurrentMessageInformation information, Exception exception)
>>> > > > >        {
>>> > > > >            var messageId = information.MessageId;
>>> > > > >            if (exception == null) return;
>>> > > > >            if(!typeof
>>> > > > > (RequestToGenerateReportMessage).IsAssignableFrom
>>> > > > > (information.Message.GetType())) return;
>>> >
>>> > > > >            increment_failed_attempts(messageId);
>>> > > > >            if (!has_exceeded_number_of_retries(messageId))
>>> return;
>>> >
>>> > > > >            send_failure_notice(information);
>>> > > > >        }
>>> >
>>> > > > >        private void
>>> > > > > message_processing_completed_clear_failures_if_necessary
>>> > > > > (CurrentMessageInformation information, Exception exception)
>>> > > > >        {
>>> > > > >            if (exception != null) return;
>>> > > > >            failures.Write(writer => writer.Remove
>>> > > > > (information.MessageId));
>>> > > > >        }
>>> >
>>> > > > >        private void increment_failed_attempts(Guid messageId)
>>> > > > >        {
>>> > > > >            failures.Write(writer =>
>>> > > > >                               {
>>> > > > >                                   int attempts;
>>> > > > >                                   writer.TryGetValue(messageId,
>>> out
>>> > > > > attempts);
>>> > > > >                                   writer.Add(messageId,
>>> ++attempts);
>>> > > > >                               });
>>> > > > >        }
>>> >
>>> > > > >        private bool has_exceeded_number_of_retries(Guid id)
>>> > > > >        {
>>> > > > >            var attempts = 0;
>>> > > > >            failures.Read(reader => reader.TryGetValue(id, out
>>> > > > > attempts));
>>> > > > >            return attempts >= numberOfRetries;
>>> > > > >        }
>>> >
>>> > > > >        private void send_failure_notice(CurrentMessageInformation
>>> > > > > information)
>>> > > > >        {
>>> > > > >            var message = information.Message;
>>> > > > >            var generatedReportFailed = new ReportGeneratedMessage
>>> > > > >                              {
>>> > > > >                                  instance_id =
>>> > > > > reflection.parse_instance_id(message),
>>> > > > >                                  date = reflection.parse_date
>>> > > > > (message),
>>> > > > >                                  region_id =
>>> > > > > reflection.parse_region_id(message),
>>> > > > >                                  report_id =
>>> > > > > reflection.parse_report_id(message),
>>> > > > >                                  file_name = "",
>>> > > > >                                  success = false
>>> > > > >                              };
>>> > > > >            theTransport.Reply(generatedReportFailed);
>>> > > > >        }
>>> > > > >    }
>>> >
>>> > > > > the problem is the system is not throwing an excpetion and I'm
>>> not
>>> > > > > consuming the generatedReportFailed message on the other end. if
>>> I
>>> > > > > move the send_failure_notice logic into the Consume() member of
>>> each
>>> > > > > consumer wrapped in a try/catch the workflow is processed
>>> correctly
>>> > > > > and I receive a failure notice on the other end.
>>> >
>>> > > > > it what i'm trying to do possible? send/reply a message from
>>> within
>>> > > > > the message module? it would seem I don't need to wrap this in a
>>> > > > > transactionscope because theTransport will handle that for me.
>>> the
>>> > > > > Transport implementation is RQ.
>>> >
>>> > > > > I also tried
>>> > > > > theTransport.Send(new EndPoint{Url=information.Source}, new
>>> object[]
>>> > > > > {generatedReportFailed});
>>> > > > > but that was throwing timeout exceptions. (duration cannot be
>>> > > > > negative). or just not working at all.
>>>
>>>
>>
>>
>>
>
> >
>

--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"Rhino Tools Dev" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to 
[email protected]
For more options, visit this group at 
http://groups.google.com/group/rhino-tools-dev?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to