I have a service that will automatically transfer data from one db to 
another. I'm using Rhino.ETL for the data transfer. I'm using RSB with RQ 
as the mechanism to keep the ETL running. conceptually it works likes this.

1. install the app as a service using RSB Hosting. during install send a 
message to run the ETL process.
2. start sevice, which will consume the message to run the ETL Process.
3. consuming the message will
    3.1. run the ETL process
    3.2. check for additional transfers that may have arrived
           3.2.1. If another transfer exists, immediately send a message to 
run the ETL process.
           3.2.2. if no additional transfers are pending, send a message in 
10 minutes to run the ETL process.

as part of the ETL operations I implemented my own 
ConventionInput/OutputCommands to not use transactions and instead rely on 
the SystemTransaction opened by RhinoQueues.

I also implemented a message module to verify the databases are available.
public class VerifyDatabaseAvailability
    : WithLoggingMixin
      , IMessageModule
{
    private IServiceBus theBus;

    public Action TestDatabaseAvailability { get; set; }

    public VerifyDatabaseAvailability()
    {
        TestDatabaseAvailability = () =>
                                       {
                                           using (Use.Connection("source"))
                                           using 
(Use.Connection("destination"))
                                           {
                                           }
                                       };
    }

    public void Init(ITransport transport, IServiceBus bus)
    {
        theBus = bus;
        transport.MessageArrived += CanConnectToDatabase;
    }

    public void Stop(ITransport transport, IServiceBus bus)
    {
        transport.MessageArrived -= CanConnectToDatabase;
    }

    private bool CanConnectToDatabase(CurrentMessageInformation arg)
    {
        try
        {
            TestDatabaseAvailability();
            return false;
        }
        catch
        {
            var oneHourFromNow = SystemTime.Now().AddHours(1);
            theBus.DelaySend(oneHourFromNow, arg.AllMessages);

            Info("Databases are not available. Will try again on {0:F}", 
oneHourFromNow);

            return true;
        }
    }
}

this handles the situations where either database may be offline for 
maintenance. I know this is working because I see evidence of this in the 
logs when we take the databases offline. However there are some instances 
where a TransactionAbortedException or SqlException ('Timeout expired' or 
'A transport-level error') is thrown. When this occurs RSB makes 3 attempts 
to consume the message, which is the default behavior. The message cannot 
be consumed due to the exception and is moved to the error queue. Once in 
the error queue the system will stop receiving messages to run the ETL 
process. The service is running, but there are not messages being sent.

TransactionAbortedException throw within the message module above
Rhino.ServiceBus.RhinoQueues.RhinoQueuesTransport [(null)] - Failed to 
process message
System.Transactions.TransactionAbortedException: The transaction has 
aborted.
at 
System.Transactions.TransactionStatePromotedAborted.CreateAbortingClone(InternalTransaction
 
tx)
at System.Transactions.DependentTransaction..ctor(IsolationLevel isoLevel, 
InternalTransaction internalTransaction, Boolean blocking)
at System.Transactions.Transaction.DependentClone(DependentCloneOption 
cloneOption)
at System.Transactions.TransactionScope.SetCurrent(Transaction newCurrent)
at System.Transactions.TransactionScope.PushScope()
at System.Transactions.TransactionScope..ctor(TransactionScopeOption 
scopeOption, TransactionOptions transactionOptions)
at Rhino.ServiceBus.RhinoQueues.RhinoQueuesTransport.SendInternal(Object[] 
msgs, Endpoint destination, Action`1 customizeHeaders)
at Rhino.ServiceBus.RhinoQueues.RhinoQueuesTransport.Send(Endpoint 
endpoint, DateTime processAgainAt, Object[] msgs)
at Rhino.ServiceBus.Impl.DefaultServiceBus.DelaySend(DateTime time, 
Object[] msgs)
at 
MyProject.VerifyDatabaseAvailability.CanConnectToDatabase(CurrentMessageInformation
 
arg)
at 
Rhino.ServiceBus.Transport.TransportUtil.ProcessSingleMessage(CurrentMessageInformation
 
currentMessageInformation, Func`2 messageRecieved)
at Rhino.ServiceBus.RhinoQueues.RhinoQueuesTransport.ProcessMessage(Message 
message, TransactionScope tx, Func`2 messageRecieved, Action`2 
messageCompleted, Action`1 beforeTransactionCommit)

And Sql exceptions occur within the ETL process
Rhino.Etl.Core.Pipelines.SingleThreadedPipelineExecuter [(null)] - Failed 
to execute operation 
System.Data.SqlClient.SqlException (0x80131904): A transport-level error 
has occurred when receiving results from the server. (provider: TCP 
Provider, error: 0 - The specified network name is no longer available.)
   at System.Data.SqlClient.SqlConnection.OnError(SqlException exception, 
Boolean breakConnection)
   at System.Data.SqlClient.SqlInternalConnection.OnError(SqlException 
exception, Boolean breakConnection)
   at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning()
   at 
System.Data.SqlClient.TdsParserStateObject.ReadSniError(TdsParserStateObject 
stateObj, UInt32 error)
   at System.Data.SqlClient.TdsParserStateObject.ReadSni(DbAsyncResult 
asyncResult, TdsParserStateObject stateObj)
   at System.Data.SqlClient.TdsParserStateObject.ReadNetworkPacket()
   at System.Data.SqlClient.TdsParserStateObject.ReadBuffer()
   at System.Data.SqlClient.TdsParserStateObject.ReadByte()
   at System.Data.SqlClient.TdsParser.Run(RunBehavior runBehavior, 
SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet 
bulkCopyHandler, TdsParserStateObject stateObj)
   at System.Data.SqlClient.SqlCommand.RunExecuteNonQueryTds(String 
methodName, Boolean async)
   at 
System.Data.SqlClient.SqlCommand.InternalExecuteNonQuery(DbAsyncResult 
result, String methodName, Boolean sendToPipe)
   at System.Data.SqlClient.SqlCommand.ExecuteNonQuery()
   at MyProject.DatabaseExtensions.SetArithabortOn(IDbConnection connection)
   at MyProject.MyOutputOperation.<Execute>d__0.MoveNext()
   at Rhino.Etl.Core.Enumerables.SingleRowEventRaisingEnumerator.MoveNext()
   at Rhino.Etl.Core.Enumerables.EventRaisingEnumerator.MoveNext()
   at Rhino.Etl.Core.Enumerables.CachingEnumerable`1.MoveNext()
   at Rhino.Etl.Core.Operations.BranchingOperation.<Execute>d__2.MoveNext()
   at Rhino.Etl.Core.Enumerables.SingleRowEventRaisingEnumerator.MoveNext()
   at Rhino.Etl.Core.Enumerables.EventRaisingEnumerator.MoveNext()
   at Rhino.Etl.Core.Enumerables.CachingEnumerable`1.MoveNext()
   at Rhino.Etl.Core.Operations.BranchingOperation.<Execute>d__2.MoveNext()
   at Rhino.Etl.Core.Enumerables.SingleRowEventRaisingEnumerator.MoveNext()
   at Rhino.Etl.Core.Enumerables.EventRaisingEnumerator.MoveNext()
   at Rhino.Etl.Core.Enumerables.CachingEnumerable`1.MoveNext()
   at 
Rhino.Etl.Core.Pipelines.AbstractPipelineExecuter.ExecutePipeline(IEnumerable`1 
pipeline)

I receive logs that inform me of the errors, I then run a reboot script 
which re-installs the service. It's a brute force approach, but it gets the 
service up and running.

I would like to add better error handling to account for the 
TransactionAbortedException and SqlException and tell RSB to send a message 
in 1 hour to try again. Conceptually I want to continue to send messages at 
a later time even after the message completely fails.

How can I handle this scenario with RSB? Or should I remove ESB and handle 
the continued running differently?

-- 
You received this message because you are subscribed to the Google Groups 
"Rhino Tools Dev" group.
To view this discussion on the web visit 
https://groups.google.com/d/msg/rhino-tools-dev/-/J3vaQKSgLGkJ.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to 
[email protected].
For more options, visit this group at 
http://groups.google.com/group/rhino-tools-dev?hl=en.

Reply via email to